Skip to content
Snippets Groups Projects
ContextServiceServicerImpl.py 13.2 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Dict, List, Set, Tuple
import grpc, logging
from prometheus_client import Counter, Histogram
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.Checkers import chk_string
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.database.api.context.Constants import DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.database.api.Database import Database
from common.exceptions.ServiceException import ServiceException
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.proto.context_pb2 import Empty, Link, LinkId, Topology
from context.proto.context_pb2_grpc import ContextServiceServicer

LOGGER = logging.getLogger(__name__)

GETTOPOLOGY_COUNTER_STARTED    = Counter  ('context_gettopology_counter_started',
                                          'Context:GetTopology counter of requests started'  )
GETTOPOLOGY_COUNTER_COMPLETED  = Counter  ('context_gettopology_counter_completed',
                                          'Context:GetTopology counter of requests completed')
GETTOPOLOGY_COUNTER_FAILED     = Counter  ('context_gettopology_counter_failed',
                                          'Context:GetTopology counter of requests failed'   )
GETTOPOLOGY_HISTOGRAM_DURATION = Histogram('context_gettopology_histogram_duration',
                                          'Context:GetTopology histogram of request duration')

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
ADDLINK_COUNTER_STARTED    = Counter  ('context_addlink_counter_started',
                                       'Context:AddLink counter of requests started'  )
ADDLINK_COUNTER_COMPLETED  = Counter  ('context_addlink_counter_completed',
                                       'Context:AddLink counter of requests completed')
ADDLINK_COUNTER_FAILED     = Counter  ('context_addlink_counter_failed',
                                       'Context:AddLink counter of requests failed'   )
ADDLINK_HISTOGRAM_DURATION = Histogram('context_addlink_histogram_duration',
                                       'Context:AddLink histogram of request duration')

DELETELINK_COUNTER_STARTED    = Counter  ('context_deletelink_counter_started',
                                          'Context:DeleteLink counter of requests started'  )
DELETELINK_COUNTER_COMPLETED  = Counter  ('context_deletelink_counter_completed',
                                          'Context:DeleteLink counter of requests completed')
DELETELINK_COUNTER_FAILED     = Counter  ('context_deletelink_counter_failed',
                                          'Context:DeleteLink counter of requests failed'   )
DELETELINK_HISTOGRAM_DURATION = Histogram('context_deletelink_histogram_duration',
                                          'Context:DeleteLink histogram of request duration')

class ContextServiceServicerImpl(ContextServiceServicer):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def __init__(self, database : Database):
        LOGGER.debug('Creating Servicer...')
        self.database = database
        LOGGER.debug('Servicer Created')

    @GETTOPOLOGY_HISTOGRAM_DURATION.time()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetTopology(self, request : Empty, grpc_context : grpc.ServicerContext) -> Topology:
        GETTOPOLOGY_COUNTER_STARTED.inc()
        try:
            LOGGER.debug('GetTopology request: {}'.format(str(request)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

            # ----- Validate request data and pre-conditions -----------------------------------------------------------
            db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
            db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()

            # ----- Retrieve data from the database --------------------------------------------------------------------
            json_topology = db_topology.dump()

            # ----- Compose reply --------------------------------------------------------------------------------------
            reply = Topology(**json_topology)
            LOGGER.debug('GetTopology reply: {}'.format(str(reply)))
            GETTOPOLOGY_COUNTER_COMPLETED.inc()
            return reply
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        except ServiceException as e:                               # pragma: no cover (ServiceException not thrown)
            grpc_context.abort(e.code, e.details)                   # pragma: no cover (ServiceException not thrown)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        except Exception as e:                                      # pragma: no cover
            LOGGER.exception('GetTopology exception')               # pragma: no cover
            GETTOPOLOGY_COUNTER_FAILED.inc()                        # pragma: no cover
            grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))    # pragma: no cover
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @ADDLINK_HISTOGRAM_DURATION.time()
    def AddLink(self, request : Link, grpc_context : grpc.ServicerContext) -> LinkId:
        ADDLINK_COUNTER_STARTED.inc()
        try:
            LOGGER.debug('AddLink request: {}'.format(str(request)))

            # ----- Validate request data and pre-conditions -----------------------------------------------------------
            try:
                link_id = chk_string('link.link_id.link_id.uuid',
                                     request.link_id.link_id.uuid,
                                     allow_empty=False)
            except Exception as e:
                LOGGER.exception('Invalid arguments:')
                raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))

            db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
            db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()

            if db_topology.links.contains(link_id):
                msg = 'Link({}) already exists in the database.'
                msg = msg.format(link_id)
                raise ServiceException(grpc.StatusCode.ALREADY_EXISTS, msg)

            added_devices_and_endpoints : Dict[str, Set[str]] = {}
            device_endpoint_pairs : List[Tuple[str, str]] = []
            for i,endpoint in enumerate(request.endpointList):
                try:
                    ep_context_id  = chk_string('endpoint[#{}].topoId.contextId.contextUuid.uuid'.format(i),
                                                endpoint.topoId.contextId.contextUuid.uuid,
                                                allow_empty=True)
                    ep_topology_id = chk_string('endpoint[#{}].topoId.topoId.uuid'.format(i),
                                                endpoint.topoId.topoId.uuid,
                                                allow_empty=True)
                    ep_device_id   = chk_string('endpoint[#{}].dev_id.device_id.uuid'.format(i),
                                                endpoint.dev_id.device_id.uuid,
                                                allow_empty=False)
                    ep_port_id     = chk_string('endpoint[#{}].port_id.uuid'.format(i),
                                                endpoint.port_id.uuid,
                                                allow_empty=False)
                except Exception as e:
                    LOGGER.exception('Invalid arguments:')
                    raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))

                if (len(ep_context_id) > 0) and (ep_context_id != DEFAULT_CONTEXT_ID):
                    msg = ' '.join([
                        'Unsupported Context({}) in Endpoint(#{}) of Link({}).',
                        'Only default Context({}) is currently supported.',
                        'Optionally, leave field empty to use default Context.',
                    ])
                    msg = msg.format(ep_context_id, i, link_id, DEFAULT_CONTEXT_ID)
                    raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
                elif len(ep_context_id) == 0:
                    ep_context_id = DEFAULT_CONTEXT_ID

                if (len(ep_topology_id) > 0) and (ep_topology_id != DEFAULT_TOPOLOGY_ID):
                    msg = ' '.join([
                        'Unsupported Topology({}) in Endpoint(#{}) of Link({}).',
                        'Only default Topology({}) is currently supported.',
                        'Optionally, leave field empty to use default Topology.',
                    ])
                    msg = msg.format(ep_topology_id, i, link_id, DEFAULT_TOPOLOGY_ID)
                    raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
                elif len(ep_topology_id) == 0:
                    ep_topology_id = DEFAULT_TOPOLOGY_ID

                if ep_device_id in added_devices_and_endpoints:
                    msg = 'Duplicated Device({}) in Endpoint(#{}) of Link({}).'
                    msg = msg.format(ep_device_id, i, link_id)
                    raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)

                if not db_topology.devices.contains(ep_device_id):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                    msg = ' '.join([
                        'Context({})/Topology({})/Device({}) in Endpoint(#{}) of Link({})',
                        'does not exist in the database.',
                    ])
                    msg = msg.format(ep_context_id, ep_topology_id, ep_device_id, ep_port_id, i, link_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                    raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)

                added_device_and_endpoints = added_devices_and_endpoints.setdefault(ep_device_id, set())

                # should never happen since same device cannot appear 2 times in the link
                if ep_port_id in added_device_and_endpoints:                                # pragma: no cover
                    msg = 'Duplicated Device({})/Port({}) in Endpoint(#{}) of Link({}).'    # pragma: no cover
                    msg = msg.format(ep_device_id, ep_port_id, i, link_id)                  # pragma: no cover
                    raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)           # pragma: no cover

                if not db_topology.device(ep_device_id).endpoints.contains(ep_port_id):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                    msg = ' '.join([
                        'Context({})/Topology({})/Device({})/Port({}) in Endpoint(#{}) of Link({})',
                        'does not exist in the database.',
                    ])
                    msg = msg.format(ep_context_id, ep_topology_id, ep_device_id, ep_port_id, i, link_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                    raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                added_device_and_endpoints.add(ep_port_id)
                device_endpoint_pairs.append((ep_device_id, ep_port_id))

            # ----- Implement changes in the database ------------------------------------------------------------------
            db_link = db_topology.link(link_id).create()
            for device_id,endpoint_id in device_endpoint_pairs:
                link_endpoint_id = '{}/{}'.format(device_id, endpoint_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                db_endpoint = db_topology.device(device_id).endpoint(endpoint_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                db_link.endpoint(link_endpoint_id).create(db_endpoint)

            # ----- Compose reply --------------------------------------------------------------------------------------
            reply = LinkId(**db_link.dump_id())
            LOGGER.debug('AddLink reply: {}'.format(str(reply)))
            ADDLINK_COUNTER_COMPLETED.inc()
            return reply
        except ServiceException as e:
            grpc_context.abort(e.code, e.details)
        except Exception as e:                                      # pragma: no cover
            LOGGER.exception('AddLink exception')                   # pragma: no cover
            ADDLINK_COUNTER_FAILED.inc()                            # pragma: no cover
            grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))    # pragma: no cover

    @DELETELINK_HISTOGRAM_DURATION.time()
    def DeleteLink(self, request : LinkId, grpc_context : grpc.ServicerContext) -> Empty:
        DELETELINK_COUNTER_STARTED.inc()
        try:
            LOGGER.debug('DeleteLink request: {}'.format(str(request)))

            # ----- Validate request data and pre-conditions -----------------------------------------------------------
            try:
                link_id = chk_string('link_id.link_id.uuid',
                                     request.link_id.uuid,
                                     allow_empty=False)
            except Exception as e:
                LOGGER.exception('Invalid arguments:')
                raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))

            db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
            db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()

            if not db_topology.links.contains(link_id):
                msg = 'Link({}) does not exist in the database.'
                msg = msg.format(link_id)
                raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)

            # ----- Implement changes in the database ------------------------------------------------------------------
            db_topology.link(link_id).delete()

            # ----- Compose reply --------------------------------------------------------------------------------------
            reply = Empty()
            LOGGER.debug('DeleteLink reply: {}'.format(str(reply)))
            DELETELINK_COUNTER_COMPLETED.inc()
            return reply
        except ServiceException as e:
            grpc_context.abort(e.code, e.details)
        except Exception as e:                                      # pragma: no cover
            LOGGER.exception('DeleteLink exception')                # pragma: no cover
            DELETELINK_COUNTER_FAILED.inc()                         # pragma: no cover
            grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))    # pragma: no cover