Skip to content
Snippets Groups Projects
ContextServiceServicerImpl.py 13.2 KiB
Newer Older
  • Learn to ignore specific revisions
  • Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from typing import Dict, List, Set, Tuple
    
    import grpc, logging
    from prometheus_client import Counter, Histogram
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.Checkers import chk_string
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.database.api.context.Constants import DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.database.api.Database import Database
    from common.exceptions.ServiceException import ServiceException
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from context.proto.context_pb2 import Empty, Link, LinkId, Topology
    
    from context.proto.context_pb2_grpc import ContextServiceServicer
    
    LOGGER = logging.getLogger(__name__)
    
    GETTOPOLOGY_COUNTER_STARTED    = Counter  ('context_gettopology_counter_started',
                                              'Context:GetTopology counter of requests started'  )
    GETTOPOLOGY_COUNTER_COMPLETED  = Counter  ('context_gettopology_counter_completed',
                                              'Context:GetTopology counter of requests completed')
    GETTOPOLOGY_COUNTER_FAILED     = Counter  ('context_gettopology_counter_failed',
                                              'Context:GetTopology counter of requests failed'   )
    GETTOPOLOGY_HISTOGRAM_DURATION = Histogram('context_gettopology_histogram_duration',
                                              'Context:GetTopology histogram of request duration')
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    ADDLINK_COUNTER_STARTED    = Counter  ('context_addlink_counter_started',
                                           'Context:AddLink counter of requests started'  )
    ADDLINK_COUNTER_COMPLETED  = Counter  ('context_addlink_counter_completed',
                                           'Context:AddLink counter of requests completed')
    ADDLINK_COUNTER_FAILED     = Counter  ('context_addlink_counter_failed',
                                           'Context:AddLink counter of requests failed'   )
    ADDLINK_HISTOGRAM_DURATION = Histogram('context_addlink_histogram_duration',
                                           'Context:AddLink histogram of request duration')
    
    DELETELINK_COUNTER_STARTED    = Counter  ('context_deletelink_counter_started',
                                              'Context:DeleteLink counter of requests started'  )
    DELETELINK_COUNTER_COMPLETED  = Counter  ('context_deletelink_counter_completed',
                                              'Context:DeleteLink counter of requests completed')
    DELETELINK_COUNTER_FAILED     = Counter  ('context_deletelink_counter_failed',
                                              'Context:DeleteLink counter of requests failed'   )
    DELETELINK_HISTOGRAM_DURATION = Histogram('context_deletelink_histogram_duration',
                                              'Context:DeleteLink histogram of request duration')
    
    
    class ContextServiceServicerImpl(ContextServiceServicer):
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def __init__(self, database : Database):
    
            LOGGER.debug('Creating Servicer...')
            self.database = database
            LOGGER.debug('Servicer Created')
    
        @GETTOPOLOGY_HISTOGRAM_DURATION.time()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def GetTopology(self, request : Empty, grpc_context : grpc.ServicerContext) -> Topology:
    
            GETTOPOLOGY_COUNTER_STARTED.inc()
            try:
                LOGGER.debug('GetTopology request: {}'.format(str(request)))
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
                # ----- Validate request data and pre-conditions -----------------------------------------------------------
                db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
                db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()
    
                # ----- Retrieve data from the database --------------------------------------------------------------------
                json_topology = db_topology.dump()
    
                # ----- Compose reply --------------------------------------------------------------------------------------
                reply = Topology(**json_topology)
    
                LOGGER.debug('GetTopology reply: {}'.format(str(reply)))
                GETTOPOLOGY_COUNTER_COMPLETED.inc()
                return reply
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            except ServiceException as e:                               # pragma: no cover (ServiceException not thrown)
                grpc_context.abort(e.code, e.details)                   # pragma: no cover (ServiceException not thrown)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            except Exception as e:                                      # pragma: no cover
                LOGGER.exception('GetTopology exception')               # pragma: no cover
                GETTOPOLOGY_COUNTER_FAILED.inc()                        # pragma: no cover
                grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))    # pragma: no cover
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
        @ADDLINK_HISTOGRAM_DURATION.time()
        def AddLink(self, request : Link, grpc_context : grpc.ServicerContext) -> LinkId:
            ADDLINK_COUNTER_STARTED.inc()
            try:
                LOGGER.debug('AddLink request: {}'.format(str(request)))
    
                # ----- Validate request data and pre-conditions -----------------------------------------------------------
                try:
                    link_id = chk_string('link.link_id.link_id.uuid',
                                         request.link_id.link_id.uuid,
                                         allow_empty=False)
                except Exception as e:
                    LOGGER.exception('Invalid arguments:')
                    raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))
    
                db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
                db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()
    
                if db_topology.links.contains(link_id):
                    msg = 'Link({}) already exists in the database.'
                    msg = msg.format(link_id)
                    raise ServiceException(grpc.StatusCode.ALREADY_EXISTS, msg)
    
                added_devices_and_endpoints : Dict[str, Set[str]] = {}
                device_endpoint_pairs : List[Tuple[str, str]] = []
                for i,endpoint in enumerate(request.endpointList):
                    try:
                        ep_context_id  = chk_string('endpoint[#{}].topoId.contextId.contextUuid.uuid'.format(i),
                                                    endpoint.topoId.contextId.contextUuid.uuid,
                                                    allow_empty=True)
                        ep_topology_id = chk_string('endpoint[#{}].topoId.topoId.uuid'.format(i),
                                                    endpoint.topoId.topoId.uuid,
                                                    allow_empty=True)
                        ep_device_id   = chk_string('endpoint[#{}].dev_id.device_id.uuid'.format(i),
                                                    endpoint.dev_id.device_id.uuid,
                                                    allow_empty=False)
                        ep_port_id     = chk_string('endpoint[#{}].port_id.uuid'.format(i),
                                                    endpoint.port_id.uuid,
                                                    allow_empty=False)
                    except Exception as e:
                        LOGGER.exception('Invalid arguments:')
                        raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))
    
                    if (len(ep_context_id) > 0) and (ep_context_id != DEFAULT_CONTEXT_ID):
                        msg = ' '.join([
                            'Unsupported Context({}) in Endpoint(#{}) of Link({}).',
                            'Only default Context({}) is currently supported.',
                            'Optionally, leave field empty to use default Context.',
                        ])
                        msg = msg.format(ep_context_id, i, link_id, DEFAULT_CONTEXT_ID)
                        raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
                    elif len(ep_context_id) == 0:
                        ep_context_id = DEFAULT_CONTEXT_ID
    
                    if (len(ep_topology_id) > 0) and (ep_topology_id != DEFAULT_TOPOLOGY_ID):
                        msg = ' '.join([
                            'Unsupported Topology({}) in Endpoint(#{}) of Link({}).',
                            'Only default Topology({}) is currently supported.',
                            'Optionally, leave field empty to use default Topology.',
                        ])
                        msg = msg.format(ep_topology_id, i, link_id, DEFAULT_TOPOLOGY_ID)
                        raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
                    elif len(ep_topology_id) == 0:
                        ep_topology_id = DEFAULT_TOPOLOGY_ID
    
                    if ep_device_id in added_devices_and_endpoints:
                        msg = 'Duplicated Device({}) in Endpoint(#{}) of Link({}).'
                        msg = msg.format(ep_device_id, i, link_id)
                        raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
    
                    if not db_topology.devices.contains(ep_device_id):
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                        msg = ' '.join([
                            'Context({})/Topology({})/Device({}) in Endpoint(#{}) of Link({})',
                            'does not exist in the database.',
                        ])
                        msg = msg.format(ep_context_id, ep_topology_id, ep_device_id, ep_port_id, i, link_id)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                        raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)
    
                    added_device_and_endpoints = added_devices_and_endpoints.setdefault(ep_device_id, set())
    
                    # should never happen since same device cannot appear 2 times in the link
                    if ep_port_id in added_device_and_endpoints:                                # pragma: no cover
                        msg = 'Duplicated Device({})/Port({}) in Endpoint(#{}) of Link({}).'    # pragma: no cover
                        msg = msg.format(ep_device_id, ep_port_id, i, link_id)                  # pragma: no cover
                        raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)           # pragma: no cover
    
                    if not db_topology.device(ep_device_id).endpoints.contains(ep_port_id):
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                        msg = ' '.join([
                            'Context({})/Topology({})/Device({})/Port({}) in Endpoint(#{}) of Link({})',
                            'does not exist in the database.',
                        ])
                        msg = msg.format(ep_context_id, ep_topology_id, ep_device_id, ep_port_id, i, link_id)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                        raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                    added_device_and_endpoints.add(ep_port_id)
                    device_endpoint_pairs.append((ep_device_id, ep_port_id))
    
                # ----- Implement changes in the database ------------------------------------------------------------------
                db_link = db_topology.link(link_id).create()
                for device_id,endpoint_id in device_endpoint_pairs:
                    link_endpoint_id = '{}/{}'.format(device_id, endpoint_id)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                    db_endpoint = db_topology.device(device_id).endpoint(endpoint_id)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
                    db_link.endpoint(link_endpoint_id).create(db_endpoint)
    
                # ----- Compose reply --------------------------------------------------------------------------------------
                reply = LinkId(**db_link.dump_id())
                LOGGER.debug('AddLink reply: {}'.format(str(reply)))
                ADDLINK_COUNTER_COMPLETED.inc()
                return reply
            except ServiceException as e:
                grpc_context.abort(e.code, e.details)
            except Exception as e:                                      # pragma: no cover
                LOGGER.exception('AddLink exception')                   # pragma: no cover
                ADDLINK_COUNTER_FAILED.inc()                            # pragma: no cover
                grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))    # pragma: no cover
    
        @DELETELINK_HISTOGRAM_DURATION.time()
        def DeleteLink(self, request : LinkId, grpc_context : grpc.ServicerContext) -> Empty:
            DELETELINK_COUNTER_STARTED.inc()
            try:
                LOGGER.debug('DeleteLink request: {}'.format(str(request)))
    
                # ----- Validate request data and pre-conditions -----------------------------------------------------------
                try:
                    link_id = chk_string('link_id.link_id.uuid',
                                         request.link_id.uuid,
                                         allow_empty=False)
                except Exception as e:
                    LOGGER.exception('Invalid arguments:')
                    raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))
    
                db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
                db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()
    
                if not db_topology.links.contains(link_id):
                    msg = 'Link({}) does not exist in the database.'
                    msg = msg.format(link_id)
                    raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)
    
                # ----- Implement changes in the database ------------------------------------------------------------------
                db_topology.link(link_id).delete()
    
                # ----- Compose reply --------------------------------------------------------------------------------------
                reply = Empty()
                LOGGER.debug('DeleteLink reply: {}'.format(str(reply)))
                DELETELINK_COUNTER_COMPLETED.inc()
                return reply
            except ServiceException as e:
                grpc_context.abort(e.code, e.details)
            except Exception as e:                                      # pragma: no cover
                LOGGER.exception('DeleteLink exception')                # pragma: no cover
                DELETELINK_COUNTER_FAILED.inc()                         # pragma: no cover
                grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))    # pragma: no cover