Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
ContextServiceServicerImpl.py 12.9 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Dict, List, Set, Tuple
import grpc, logging
from prometheus_client import Counter, Histogram
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.Checkers import chk_string
from common.database.api.Constants import DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.database.api.Database import Database
from common.exceptions.ServiceException import ServiceException
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.proto.context_pb2 import Empty, Link, LinkId, Topology
from context.proto.context_pb2_grpc import ContextServiceServicer

LOGGER = logging.getLogger(__name__)

GETTOPOLOGY_COUNTER_STARTED    = Counter  ('context_gettopology_counter_started',
                                          'Context:GetTopology counter of requests started'  )
GETTOPOLOGY_COUNTER_COMPLETED  = Counter  ('context_gettopology_counter_completed',
                                          'Context:GetTopology counter of requests completed')
GETTOPOLOGY_COUNTER_FAILED     = Counter  ('context_gettopology_counter_failed',
                                          'Context:GetTopology counter of requests failed'   )
GETTOPOLOGY_HISTOGRAM_DURATION = Histogram('context_gettopology_histogram_duration',
                                          'Context:GetTopology histogram of request duration')

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
ADDLINK_COUNTER_STARTED    = Counter  ('context_addlink_counter_started',
                                       'Context:AddLink counter of requests started'  )
ADDLINK_COUNTER_COMPLETED  = Counter  ('context_addlink_counter_completed',
                                       'Context:AddLink counter of requests completed')
ADDLINK_COUNTER_FAILED     = Counter  ('context_addlink_counter_failed',
                                       'Context:AddLink counter of requests failed'   )
ADDLINK_HISTOGRAM_DURATION = Histogram('context_addlink_histogram_duration',
                                       'Context:AddLink histogram of request duration')

DELETELINK_COUNTER_STARTED    = Counter  ('context_deletelink_counter_started',
                                          'Context:DeleteLink counter of requests started'  )
DELETELINK_COUNTER_COMPLETED  = Counter  ('context_deletelink_counter_completed',
                                          'Context:DeleteLink counter of requests completed')
DELETELINK_COUNTER_FAILED     = Counter  ('context_deletelink_counter_failed',
                                          'Context:DeleteLink counter of requests failed'   )
DELETELINK_HISTOGRAM_DURATION = Histogram('context_deletelink_histogram_duration',
                                          'Context:DeleteLink histogram of request duration')

class ContextServiceServicerImpl(ContextServiceServicer):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def __init__(self, database : Database):
        LOGGER.debug('Creating Servicer...')
        self.database = database
        LOGGER.debug('Servicer Created')

    @GETTOPOLOGY_HISTOGRAM_DURATION.time()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetTopology(self, request : Empty, grpc_context : grpc.ServicerContext) -> Topology:
        GETTOPOLOGY_COUNTER_STARTED.inc()
        try:
            LOGGER.debug('GetTopology request: {}'.format(str(request)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

            # ----- Validate request data and pre-conditions -----------------------------------------------------------
            db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
            db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()

            # ----- Retrieve data from the database --------------------------------------------------------------------
            json_topology = db_topology.dump()

            # ----- Compose reply --------------------------------------------------------------------------------------
            reply = Topology(**json_topology)
            LOGGER.debug('GetTopology reply: {}'.format(str(reply)))
            GETTOPOLOGY_COUNTER_COMPLETED.inc()
            return reply
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        except ServiceException as e:                               # pragma: no cover (ServiceException not thrown)
            grpc_context.abort(e.code, e.details)                   # pragma: no cover (ServiceException not thrown)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        except Exception as e:                                      # pragma: no cover
            LOGGER.exception('GetTopology exception')               # pragma: no cover
            GETTOPOLOGY_COUNTER_FAILED.inc()                        # pragma: no cover
            grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))    # pragma: no cover
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @ADDLINK_HISTOGRAM_DURATION.time()
    def AddLink(self, request : Link, grpc_context : grpc.ServicerContext) -> LinkId:
        ADDLINK_COUNTER_STARTED.inc()
        try:
            LOGGER.debug('AddLink request: {}'.format(str(request)))

            # ----- Validate request data and pre-conditions -----------------------------------------------------------
            try:
                link_id = chk_string('link.link_id.link_id.uuid',
                                     request.link_id.link_id.uuid,
                                     allow_empty=False)
            except Exception as e:
                LOGGER.exception('Invalid arguments:')
                raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))

            db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
            db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()

            if db_topology.links.contains(link_id):
                msg = 'Link({}) already exists in the database.'
                msg = msg.format(link_id)
                raise ServiceException(grpc.StatusCode.ALREADY_EXISTS, msg)

            added_devices_and_endpoints : Dict[str, Set[str]] = {}
            device_endpoint_pairs : List[Tuple[str, str]] = []
            for i,endpoint in enumerate(request.endpointList):
                try:
                    ep_context_id  = chk_string('endpoint[#{}].topoId.contextId.contextUuid.uuid'.format(i),
                                                endpoint.topoId.contextId.contextUuid.uuid,
                                                allow_empty=True)
                    ep_topology_id = chk_string('endpoint[#{}].topoId.topoId.uuid'.format(i),
                                                endpoint.topoId.topoId.uuid,
                                                allow_empty=True)
                    ep_device_id   = chk_string('endpoint[#{}].dev_id.device_id.uuid'.format(i),
                                                endpoint.dev_id.device_id.uuid,
                                                allow_empty=False)
                    ep_port_id     = chk_string('endpoint[#{}].port_id.uuid'.format(i),
                                                endpoint.port_id.uuid,
                                                allow_empty=False)
                except Exception as e:
                    LOGGER.exception('Invalid arguments:')
                    raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))

                if (len(ep_context_id) > 0) and (ep_context_id != DEFAULT_CONTEXT_ID):
                    msg = ' '.join([
                        'Unsupported Context({}) in Endpoint(#{}) of Link({}).',
                        'Only default Context({}) is currently supported.',
                        'Optionally, leave field empty to use default Context.',
                    ])
                    msg = msg.format(ep_context_id, i, link_id, DEFAULT_CONTEXT_ID)
                    raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
                elif len(ep_context_id) == 0:
                    ep_context_id = DEFAULT_CONTEXT_ID

                if (len(ep_topology_id) > 0) and (ep_topology_id != DEFAULT_TOPOLOGY_ID):
                    msg = ' '.join([
                        'Unsupported Topology({}) in Endpoint(#{}) of Link({}).',
                        'Only default Topology({}) is currently supported.',
                        'Optionally, leave field empty to use default Topology.',
                    ])
                    msg = msg.format(ep_topology_id, i, link_id, DEFAULT_TOPOLOGY_ID)
                    raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
                elif len(ep_topology_id) == 0:
                    ep_topology_id = DEFAULT_TOPOLOGY_ID

                if ep_device_id in added_devices_and_endpoints:
                    msg = 'Duplicated Device({}) in Endpoint(#{}) of Link({}).'
                    msg = msg.format(ep_device_id, i, link_id)
                    raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)

                if not db_topology.devices.contains(ep_device_id):
                    msg = 'Device({}) in Endpoint(#{}) of Link({}) does not exist in the database.'
                    msg = msg.format(ep_device_id, i, link_id)
                    raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)

                added_device_and_endpoints = added_devices_and_endpoints.setdefault(ep_device_id, set())

                # should never happen since same device cannot appear 2 times in the link
                if ep_port_id in added_device_and_endpoints:                                # pragma: no cover
                    msg = 'Duplicated Device({})/Port({}) in Endpoint(#{}) of Link({}).'    # pragma: no cover
                    msg = msg.format(ep_device_id, ep_port_id, i, link_id)                  # pragma: no cover
                    raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)           # pragma: no cover

                if not db_topology.device(ep_device_id).endpoints.contains(ep_port_id):
                    msg = 'Device({})/Port({}) in Endpoint(#{}) of Link({}) does not exist in the database.'
                    msg = msg.format(ep_device_id, ep_port_id, i, link_id)
                    raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)

                added_device_and_endpoints.add(ep_port_id)
                device_endpoint_pairs.append((ep_device_id, ep_port_id))

            # ----- Implement changes in the database ------------------------------------------------------------------
            db_link = db_topology.link(link_id).create()
            for device_id,endpoint_id in device_endpoint_pairs:
                link_endpoint_id = '{}/{}'.format(device_id, endpoint_id)
                db_endpoint = db_topology.device(ep_device_id).endpoint(ep_port_id)
                db_link.endpoint(link_endpoint_id).create(db_endpoint)

            # ----- Compose reply --------------------------------------------------------------------------------------
            reply = LinkId(**db_link.dump_id())
            LOGGER.debug('AddLink reply: {}'.format(str(reply)))
            ADDLINK_COUNTER_COMPLETED.inc()
            return reply
        except ServiceException as e:
            grpc_context.abort(e.code, e.details)
        except Exception as e:                                      # pragma: no cover
            LOGGER.exception('AddLink exception')                   # pragma: no cover
            ADDLINK_COUNTER_FAILED.inc()                            # pragma: no cover
            grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))    # pragma: no cover

    @DELETELINK_HISTOGRAM_DURATION.time()
    def DeleteLink(self, request : LinkId, grpc_context : grpc.ServicerContext) -> Empty:
        DELETELINK_COUNTER_STARTED.inc()
        try:
            LOGGER.debug('DeleteLink request: {}'.format(str(request)))

            # ----- Validate request data and pre-conditions -----------------------------------------------------------
            try:
                link_id = chk_string('link_id.link_id.uuid',
                                     request.link_id.uuid,
                                     allow_empty=False)
            except Exception as e:
                LOGGER.exception('Invalid arguments:')
                raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))

            db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
            db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()

            if not db_topology.links.contains(link_id):
                msg = 'Link({}) does not exist in the database.'
                msg = msg.format(link_id)
                raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)

            # ----- Implement changes in the database ------------------------------------------------------------------
            db_topology.link(link_id).delete()

            # ----- Compose reply --------------------------------------------------------------------------------------
            reply = Empty()
            LOGGER.debug('DeleteLink reply: {}'.format(str(reply)))
            DELETELINK_COUNTER_COMPLETED.inc()
            return reply
        except ServiceException as e:
            grpc_context.abort(e.code, e.details)
        except Exception as e:                                      # pragma: no cover
            LOGGER.exception('DeleteLink exception')                # pragma: no cover
            DELETELINK_COUNTER_FAILED.inc()                         # pragma: no cover
            grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))    # pragma: no cover