Skip to content
Snippets Groups Projects
ContextServiceServicerImpl.py 38.3 KiB
Newer Older
import grpc, json, logging, operator, threading
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Iterator, List, Set, Tuple
from common.message_broker.MessageBroker import MessageBroker
from common.orm.Database import Database
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.orm.HighLevel import (
    get_all_objects, get_object, get_or_create_object, get_related_objects, update_or_create_object)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.orm.backend.Tools import key_to_str
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.proto.context_pb2 import (
    Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList, Context, ContextEvent, ContextId,
    ContextIdList, ContextList, Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList, Empty, EventTypeEnum, Link,
    LinkEvent, LinkId, LinkIdList, LinkList, Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList, Topology,
    TopologyEvent, TopologyId, TopologyIdList, TopologyList)
from context.proto.context_pb2_grpc import ContextServiceServicer
from context.service.database.ConfigModel import ConfigModel, ConfigRuleModel, grpc_config_rules_to_raw, update_config
from context.service.database.ConnectionModel import ConnectionModel, PathHopModel, PathModel, set_path
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.ConstraintModel import ConstraintModel, ConstraintsModel, set_constraints
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.ContextModel import ContextModel
from context.service.database.DeviceModel import (
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    DeviceModel, DriverModel, grpc_to_enum__device_operational_status, set_drivers)
from context.service.database.EndPointModel import EndPointModel, KpiSampleTypeModel, set_kpi_sample_types
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.Events import notify_event
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.LinkModel import LinkModel
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.RelationModels import (
    ConnectionSubServiceModel, LinkEndPointModel, ServiceEndPointModel, TopologyDeviceModel, TopologyLinkModel)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.ServiceModel import (
    ServiceModel, grpc_to_enum__service_status, grpc_to_enum__service_type)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.TopologyModel import TopologyModel
from .Constants import (
    CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_TOPOLOGY)

LOGGER = logging.getLogger(__name__)

SERVICE_NAME = 'Context'
METHOD_NAMES = [
    'ListConnectionIds', 'ListConnections', 'GetConnection', 'SetConnection', 'RemoveConnection', 'GetConnectionEvents',
    'ListContextIds',    'ListContexts',    'GetContext',    'SetContext',    'RemoveContext',    'GetContextEvents',
    'ListTopologyIds',   'ListTopologies',  'GetTopology',   'SetTopology',   'RemoveTopology',   'GetTopologyEvents',
    'ListDeviceIds',     'ListDevices',     'GetDevice',     'SetDevice',     'RemoveDevice',     'GetDeviceEvents',
    'ListLinkIds',       'ListLinks',       'GetLink',       'SetLink',       'RemoveLink',       'GetLinkEvents',
    'ListServiceIds',    'ListServices',    'GetService',    'SetService',    'RemoveService',    'GetServiceEvents',
]
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
class ContextServiceServicerImpl(ContextServiceServicer):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def __init__(self, database : Database, messagebroker : MessageBroker):
        LOGGER.debug('Creating Servicer...')
        self.lock = threading.Lock()
        self.database = database
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.messagebroker = messagebroker
        LOGGER.debug('Servicer Created')

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Context ----------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList:
        with self.lock:
            db_contexts : List[ContextModel] = get_all_objects(self.database, ContextModel)
            db_contexts = sorted(db_contexts, key=operator.attrgetter('pk'))
            return ContextIdList(context_ids=[db_context.dump_id() for db_context in db_contexts])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListContexts(self, request: Empty, context : grpc.ServicerContext) -> ContextList:
        with self.lock:
            db_contexts : List[ContextModel] = get_all_objects(self.database, ContextModel)
            db_contexts = sorted(db_contexts, key=operator.attrgetter('pk'))
            return ContextList(contexts=[db_context.dump() for db_context in db_contexts])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context:
        with self.lock:
            context_uuid = request.context_uuid.uuid
            db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
            return Context(**db_context.dump(include_services=True, include_topologies=True))

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId:
        with self.lock:
            context_uuid = request.context_id.context_uuid.uuid

            for i,topology_id in enumerate(request.topology_ids):
                topology_context_uuid = topology_id.context_id.context_uuid.uuid
                if topology_context_uuid != context_uuid:
                    raise InvalidArgumentException(
                        'request.topology_ids[{:d}].context_id.context_uuid.uuid'.format(i), topology_context_uuid,
                        ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])

            for i,service_id in enumerate(request.service_ids):
                service_context_uuid = service_id.context_id.context_uuid.uuid
                if service_context_uuid != context_uuid:
                    raise InvalidArgumentException(
                        'request.service_ids[{:d}].context_id.context_uuid.uuid'.format(i), service_context_uuid,
                        ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])

            result : Tuple[ContextModel, bool] = update_or_create_object(
                self.database, ContextModel, context_uuid, {'context_uuid': context_uuid})
            db_context, updated = result

            for i,topology_id in enumerate(request.topology_ids):
                topology_context_uuid = topology_id.context_id.context_uuid.uuid
                topology_uuid = topology_id.topology_uuid.uuid
                get_object(self.database, TopologyModel, [context_uuid, topology_uuid]) # just to confirm it exists

            for i,service_id in enumerate(request.service_ids):
                service_context_uuid = service_id.context_id.context_uuid.uuid
                service_uuid = service_id.service_uuid.uuid
                get_object(self.database, ServiceModel, [context_uuid, service_uuid]) # just to confirm it exists

            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            dict_context_id = db_context.dump_id()
            notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': dict_context_id})
            return ContextId(**dict_context_id)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty:
        with self.lock:
            context_uuid = request.context_uuid.uuid
            db_context = ContextModel(self.database, context_uuid, auto_load=False)
            found = db_context.load()
            if not found: return Empty()
            dict_context_id = db_context.dump_id()
            db_context.delete()
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': dict_context_id})
            return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT):
            yield ContextEvent(**json.loads(message.content))


    # ----- Topology ---------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListTopologyIds(self, request: ContextId, context : grpc.ServicerContext) -> TopologyIdList:
        with self.lock:
            context_uuid = request.context_uuid.uuid
            db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
            db_topologies : Set[TopologyModel] = get_related_objects(db_context, TopologyModel)
            db_topologies = sorted(db_topologies, key=operator.attrgetter('pk'))
            return TopologyIdList(topology_ids=[db_topology.dump_id() for db_topology in db_topologies])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListTopologies(self, request: ContextId, context : grpc.ServicerContext) -> TopologyList:
        with self.lock:
            context_uuid = request.context_uuid.uuid
            db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
            db_topologies : Set[TopologyModel] = get_related_objects(db_context, TopologyModel)
            db_topologies = sorted(db_topologies, key=operator.attrgetter('pk'))
            return TopologyList(topologies=[db_topology.dump() for db_topology in db_topologies])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Topology:
        with self.lock:
            str_key = key_to_str([request.context_id.context_uuid.uuid, request.topology_uuid.uuid])
            db_topology : TopologyModel = get_object(self.database, TopologyModel, str_key)
            return Topology(**db_topology.dump(include_devices=True, include_links=True))

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetTopology(self, request: Topology, context : grpc.ServicerContext) -> TopologyId:
        with self.lock:
            context_uuid = request.topology_id.context_id.context_uuid.uuid
            db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)

            topology_uuid = request.topology_id.topology_uuid.uuid
            str_topology_key = key_to_str([context_uuid, topology_uuid])
            result : Tuple[TopologyModel, bool] = update_or_create_object(
                self.database, TopologyModel, str_topology_key, {'context_fk': db_context, 'topology_uuid': topology_uuid})
            db_topology,updated = result

            for device_id in request.device_ids:
                device_uuid = device_id.device_uuid.uuid
                db_device = get_object(self.database, DeviceModel, device_uuid)
                str_topology_device_key = key_to_str([str_topology_key, device_uuid], separator='--')
                result : Tuple[TopologyDeviceModel, bool] = update_or_create_object(
                    self.database, TopologyDeviceModel, str_topology_device_key,
                    {'topology_fk': db_topology, 'device_fk': db_device})
                #db_topology_device,topology_device_updated = result

            for link_id in request.link_ids:
                link_uuid = link_id.link_uuid.uuid
                db_link = get_object(self.database, LinkModel, link_uuid)

                str_topology_link_key = key_to_str([str_topology_key, link_uuid], separator='--')
                result : Tuple[TopologyLinkModel, bool] = update_or_create_object(
                    self.database, TopologyLinkModel, str_topology_link_key,
                    {'topology_fk': db_topology, 'link_fk': db_link})
                #db_topology_link,topology_link_updated = result

            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            dict_topology_id = db_topology.dump_id()
            notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': dict_topology_id})
            return TopologyId(**dict_topology_id)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Empty:
        with self.lock:
            context_uuid = request.context_id.context_uuid.uuid
            topology_uuid = request.topology_uuid.uuid
            db_topology = TopologyModel(self.database, key_to_str([context_uuid, topology_uuid]), auto_load=False)
            found = db_topology.load()
            if not found: return Empty()
            dict_topology_id = db_topology.dump_id()
            db_topology.delete()
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': dict_topology_id})
            return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetTopologyEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT):
            yield TopologyEvent(**json.loads(message.content))


    # ----- Device -----------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListDeviceIds(self, request: Empty, context : grpc.ServicerContext) -> DeviceIdList:
        with self.lock:
            db_devices : List[DeviceModel] = get_all_objects(self.database, DeviceModel)
            db_devices = sorted(db_devices, key=operator.attrgetter('pk'))
            return DeviceIdList(device_ids=[db_device.dump_id() for db_device in db_devices])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListDevices(self, request: Empty, context : grpc.ServicerContext) -> DeviceList:
        with self.lock:
            db_devices : List[DeviceModel] = get_all_objects(self.database, DeviceModel)
            db_devices = sorted(db_devices, key=operator.attrgetter('pk'))
            return DeviceList(devices=[db_device.dump() for db_device in db_devices])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Device:
        with self.lock:
            device_uuid = request.device_uuid.uuid
            db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid)
            return Device(**db_device.dump(
                include_config_rules=True, include_drivers=True, include_endpoints=True))

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetDevice(self, request: Device, context : grpc.ServicerContext) -> DeviceId:
        with self.lock:
            device_uuid = request.device_id.device_uuid.uuid

            for i,endpoint in enumerate(request.device_endpoints):
                endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
                if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
                if device_uuid != endpoint_device_uuid:
                    raise InvalidArgumentException(
                        'request.device_endpoints[{:d}].device_id.device_uuid.uuid'.format(i), endpoint_device_uuid,
                        ['should be == {:s}({:s})'.format('request.device_id.device_uuid.uuid', device_uuid)])

            config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
            running_config_result = update_config(self.database, device_uuid, 'running', config_rules)
            db_running_config = running_config_result[0][0]

            result : Tuple[DeviceModel, bool] = update_or_create_object(self.database, DeviceModel, device_uuid, {
                'device_uuid'              : device_uuid,
                'device_type'              : request.device_type,
                'device_operational_status': grpc_to_enum__device_operational_status(request.device_operational_status),
                'device_config_fk'         : db_running_config,
            })
            db_device, updated = result

            set_drivers(self.database, db_device, request.device_drivers)

            for i,endpoint in enumerate(request.device_endpoints):
                endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
                endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
                if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid

                str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
                endpoint_attributes = {
                    'device_fk'    : db_device,
                    'endpoint_uuid': endpoint_uuid,
                    'endpoint_type': endpoint.endpoint_type,
                }

                endpoint_topology_context_uuid = endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid
                endpoint_topology_uuid = endpoint.endpoint_id.topology_id.topology_uuid.uuid
                if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
                    str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
                    db_topology : TopologyModel = get_object(self.database, TopologyModel, str_topology_key)

                    str_topology_device_key = key_to_str([str_topology_key, device_uuid], separator='--')
                    result : Tuple[TopologyDeviceModel, bool] = get_or_create_object(
                        self.database, TopologyDeviceModel, str_topology_device_key, {
                            'topology_fk': db_topology, 'device_fk': db_device})
                    #db_topology_device, topology_device_created = result

                    str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
                    endpoint_attributes['topology_fk'] = db_topology

                result : Tuple[EndPointModel, bool] = update_or_create_object(
                    self.database, EndPointModel, str_endpoint_key, endpoint_attributes)
                db_endpoint, endpoint_updated = result

                set_kpi_sample_types(self.database, db_endpoint, endpoint.kpi_sample_types)

            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            dict_device_id = db_device.dump_id()
            notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id})
            return DeviceId(**dict_device_id)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Empty:
        with self.lock:
            device_uuid = request.device_uuid.uuid
            db_device = DeviceModel(self.database, device_uuid, auto_load=False)
            found = db_device.load()
            if not found: return Empty()
            dict_device_id = db_device.dump_id()
            for db_endpoint_pk,_ in db_device.references(EndPointModel):
                db_endpoint = EndPointModel(self.database, db_endpoint_pk)
                for db_kpi_sample_type_pk,_ in db_endpoint.references(KpiSampleTypeModel):
                    KpiSampleTypeModel(self.database, db_kpi_sample_type_pk).delete()
                db_endpoint.delete()
            for db_topology_device_pk,_ in db_device.references(TopologyDeviceModel):
                TopologyDeviceModel(self.database, db_topology_device_pk).delete()
            for db_driver_pk,_ in db_device.references(DriverModel):
                DriverModel(self.database, db_driver_pk).delete()
            db_config = ConfigModel(self.database, db_device.device_config_fk)
            for db_config_rule_pk,_ in db_config.references(ConfigRuleModel):
                ConfigRuleModel(self.database, db_config_rule_pk).delete()
            db_device.delete()
            db_config.delete()
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id})
            return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetDeviceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT):
            yield DeviceEvent(**json.loads(message.content))


    # ----- Link -------------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListLinkIds(self, request: Empty, context : grpc.ServicerContext) -> LinkIdList:
        with self.lock:
            db_links : List[LinkModel] = get_all_objects(self.database, LinkModel)
            db_links = sorted(db_links, key=operator.attrgetter('pk'))
            return LinkIdList(link_ids=[db_link.dump_id() for db_link in db_links])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListLinks(self, request: Empty, context : grpc.ServicerContext) -> LinkList:
        with self.lock:
            db_links : List[LinkModel] = get_all_objects(self.database, LinkModel)
            db_links = sorted(db_links, key=operator.attrgetter('pk'))
            return LinkList(links=[db_link.dump() for db_link in db_links])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetLink(self, request: LinkId, context : grpc.ServicerContext) -> Link:
        with self.lock:
            link_uuid = request.link_uuid.uuid
            db_link : LinkModel = get_object(self.database, LinkModel, link_uuid)
            return Link(**db_link.dump())

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetLink(self, request: Link, context : grpc.ServicerContext) -> LinkId:
        with self.lock:
            link_uuid = request.link_id.link_uuid.uuid
            result : Tuple[LinkModel, bool] = update_or_create_object(
                self.database, LinkModel, link_uuid, {'link_uuid': link_uuid})
            db_link, updated = result

            for endpoint_id in request.link_endpoint_ids:
                endpoint_uuid                  = endpoint_id.endpoint_uuid.uuid
                endpoint_device_uuid           = endpoint_id.device_id.device_uuid.uuid
                endpoint_topology_uuid         = endpoint_id.topology_id.topology_uuid.uuid
                endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid

                str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid])

                db_topology = None
                if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
                    str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
                    db_topology : TopologyModel = get_object(self.database, TopologyModel, str_topology_key)
                    str_topology_device_key = key_to_str([str_topology_key, endpoint_device_uuid], separator='--')
                    get_object(self.database, TopologyDeviceModel, str_topology_device_key) # check device is in topology
                    str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')

                db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key)

                str_link_endpoint_key = key_to_str([link_uuid, endpoint_device_uuid], separator='--')
                result : Tuple[LinkEndPointModel, bool] = get_or_create_object(
                    self.database, LinkEndPointModel, str_link_endpoint_key, {
                        'link_fk': db_link, 'endpoint_fk': db_endpoint})
                #db_link_endpoint, link_endpoint_created = result

                if db_topology is not None:
                    str_topology_link_key = key_to_str([str_topology_key, link_uuid], separator='--')
                    result : Tuple[TopologyLinkModel, bool] = get_or_create_object(
                        self.database, TopologyLinkModel, str_topology_link_key, {
                            'topology_fk': db_topology, 'link_fk': db_link})
                    #db_topology_link, topology_link_created = result

            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            dict_link_id = db_link.dump_id()
            notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': dict_link_id})
            return LinkId(**dict_link_id)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveLink(self, request: LinkId, context : grpc.ServicerContext) -> Empty:
        with self.lock:
            link_uuid = request.link_uuid.uuid
            db_link = LinkModel(self.database, link_uuid, auto_load=False)
            found = db_link.load()
            if not found: return Empty()
            dict_link_id = db_link.dump_id()
            for db_link_endpoint_pk,_ in db_link.references(LinkEndPointModel):
                LinkEndPointModel(self.database, db_link_endpoint_pk).delete()
            for db_topology_link_pk,_ in db_link.references(TopologyLinkModel):
                TopologyLinkModel(self.database, db_topology_link_pk).delete()
            db_link.delete()
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': dict_link_id})
            return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetLinkEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for message in self.messagebroker.consume({TOPIC_LINK}, consume_timeout=CONSUME_TIMEOUT):
            yield LinkEvent(**json.loads(message.content))


    # ----- Service ----------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListServiceIds(self, request: ContextId, context : grpc.ServicerContext) -> ServiceIdList:
        with self.lock:
            db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid)
            db_services : Set[ServiceModel] = get_related_objects(db_context, ServiceModel)
            db_services = sorted(db_services, key=operator.attrgetter('pk'))
            return ServiceIdList(service_ids=[db_service.dump_id() for db_service in db_services])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListServices(self, request: ContextId, context : grpc.ServicerContext) -> ServiceList:
        with self.lock:
            db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid)
            db_services : Set[ServiceModel] = get_related_objects(db_context, ServiceModel)
            db_services = sorted(db_services, key=operator.attrgetter('pk'))
            return ServiceList(services=[db_service.dump() for db_service in db_services])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetService(self, request: ServiceId, context : grpc.ServicerContext) -> Service:
        with self.lock:
            str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid])
            db_service : ServiceModel = get_object(self.database, ServiceModel, str_key)
            return Service(**db_service.dump(
                include_endpoint_ids=True, include_constraints=True, include_config_rules=True))

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetService(self, request: Service, context : grpc.ServicerContext) -> ServiceId:
        with self.lock:
            context_uuid = request.service_id.context_id.context_uuid.uuid
            db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)

            for i,endpoint_id in enumerate(request.service_endpoint_ids):
                endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
                if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid:
                    raise InvalidArgumentException(
                        'request.service_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i),
                        endpoint_topology_context_uuid,
                        ['should be == {:s}({:s})'.format('request.service_id.context_id.context_uuid.uuid', context_uuid)])

            service_uuid = request.service_id.service_uuid.uuid
            str_service_key = key_to_str([context_uuid, service_uuid])

            constraints_result = set_constraints(
                self.database, str_service_key, 'constraints', request.service_constraints)
            db_constraints = constraints_result[0][0]

            config_rules = grpc_config_rules_to_raw(request.service_config.config_rules)
            running_config_result = update_config(self.database, str_service_key, 'running', config_rules)
            db_running_config = running_config_result[0][0]

            result : Tuple[ServiceModel, bool] = update_or_create_object(self.database, ServiceModel, str_service_key, {
                'context_fk'            : db_context,
                'service_uuid'          : service_uuid,
                'service_type'          : grpc_to_enum__service_type(request.service_type),
                'service_constraints_fk': db_constraints,
                'service_status'        : grpc_to_enum__service_status(request.service_status.service_status),
                'service_config_fk'     : db_running_config,
            })
            db_service, updated = result

            for i,endpoint_id in enumerate(request.service_endpoint_ids):
                endpoint_uuid                  = endpoint_id.endpoint_uuid.uuid
                endpoint_device_uuid           = endpoint_id.device_id.device_uuid.uuid
                endpoint_topology_uuid         = endpoint_id.topology_id.topology_uuid.uuid
                endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid

                str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid])
                if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
                    str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
                    str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')

                db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key)

                str_service_endpoint_key = key_to_str([service_uuid, str_endpoint_key], separator='--')
                result : Tuple[ServiceEndPointModel, bool] = get_or_create_object(
                    self.database, ServiceEndPointModel, str_service_endpoint_key, {
                        'service_fk': db_service, 'endpoint_fk': db_endpoint})
                #db_service_endpoint, service_endpoint_created = result

            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            dict_service_id = db_service.dump_id()
            notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id})
            return ServiceId(**dict_service_id)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveService(self, request: ServiceId, context : grpc.ServicerContext) -> Empty:
        with self.lock:
            context_uuid = request.context_id.context_uuid.uuid
            service_uuid = request.service_uuid.uuid
            db_service = ServiceModel(self.database, key_to_str([context_uuid, service_uuid]), auto_load=False)
            found = db_service.load()
            if not found: return Empty()
            dict_service_id = db_service.dump_id()
            for db_service_endpoint_pk,_ in db_service.references(ServiceEndPointModel):
                ServiceEndPointModel(self.database, db_service_endpoint_pk).delete()
            db_config = ConfigModel(self.database, db_service.service_config_fk)
            for db_config_rule_pk,_ in db_config.references(ConfigRuleModel):
                ConfigRuleModel(self.database, db_config_rule_pk).delete()
            db_constraints = ConstraintsModel(self.database, db_service.service_constraints_fk)
            for db_constraint_pk,_ in db_constraints.references(ConstraintModel):
                ConstraintModel(self.database, db_constraint_pk).delete()
            db_service.delete()
            db_config.delete()
            db_constraints.delete()
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id})
            return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetServiceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for message in self.messagebroker.consume({TOPIC_SERVICE}, consume_timeout=CONSUME_TIMEOUT):
            yield ServiceEvent(**json.loads(message.content))


    # ----- Connection -------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListConnectionIds(self, request: ServiceId, context : grpc.ServicerContext) -> ConnectionIdList:
        with self.lock:
            str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid])
            db_service : ServiceModel = get_object(self.database, ServiceModel, str_key)
            db_connections : Set[ConnectionModel] = get_related_objects(db_service, ConnectionModel)
            db_connections = sorted(db_connections, key=operator.attrgetter('pk'))
            return ConnectionIdList(connection_ids=[db_connection.dump_id() for db_connection in db_connections])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListConnections(self, request: ContextId, context : grpc.ServicerContext) -> ServiceList:
        with self.lock:
            str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid])
            db_service : ServiceModel = get_object(self.database, ServiceModel, str_key)
            db_connections : Set[ConnectionModel] = get_related_objects(db_service, ConnectionModel)
            db_connections = sorted(db_connections, key=operator.attrgetter('pk'))
            return ConnectionList(connections=[db_connection.dump() for db_connection in db_connections])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Connection:
        with self.lock:
            db_connection : ConnectionModel = get_object(self.database, ConnectionModel, request.connection_uuid.uuid)
            return Connection(**db_connection.dump(include_path=True, include_sub_service_ids=True))

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetConnection(self, request: Connection, context : grpc.ServicerContext) -> ConnectionId:
        with self.lock:
            connection_uuid = request.connection_id.connection_uuid.uuid
            connection_attributes = {'connection_uuid': connection_uuid}
            service_context_uuid = request.service_id.context_id.context_uuid.uuid
            service_uuid = request.service_id.service_uuid.uuid
            if len(service_context_uuid) > 0 and len(service_uuid) > 0:
                str_service_key = key_to_str([service_context_uuid, service_uuid])
                db_service : ServiceModel = get_object(self.database, ServiceModel, str_service_key)
                connection_attributes['service_fk'] = db_service
            path_hops_result = set_path(self.database, connection_uuid, request.path_hops_endpoint_ids, path_name = '')
            db_path = path_hops_result[0]
            connection_attributes['path_fk'] = db_path
            result : Tuple[ConnectionModel, bool] = update_or_create_object(
                self.database, ConnectionModel, connection_uuid, connection_attributes)
            db_connection, updated = result
            for sub_service_id in request.sub_service_ids:
                sub_service_uuid         = sub_service_id.service_uuid.uuid
                sub_service_context_uuid = sub_service_id.context_id.context_uuid.uuid
                str_sub_service_key = key_to_str([sub_service_context_uuid, sub_service_uuid])
                db_service : ServiceModel = get_object(self.database, ServiceModel, str_sub_service_key)
                str_connection_sub_service_key = key_to_str([connection_uuid, str_sub_service_key], separator='--')
                result : Tuple[ConnectionSubServiceModel, bool] = get_or_create_object(
                    self.database, ConnectionSubServiceModel, str_connection_sub_service_key, {
                        'connection_fk': db_connection, 'sub_service_fk': db_service})
                #db_connection_sub_service, connection_sub_service_created = result
            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            dict_connection_id = db_connection.dump_id()
            notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': dict_connection_id})
            return ConnectionId(**dict_connection_id)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Empty:
        with self.lock:
            db_connection = ConnectionModel(self.database, request.connection_uuid.uuid, auto_load=False)
            found = db_connection.load()
            if not found: return Empty()

            dict_connection_id = db_connection.dump_id()

            db_path = PathModel(self.database, db_connection.path_fk)
            for db_path_hop_pk,_ in db_path.references(PathHopModel):
                PathHopModel(self.database, db_path_hop_pk).delete()

            # Do not remove sub-services automatically. They are supported by real services, so Service component should
            # deal with the correct removal workflow to deconfigure the devices.
            for db_connection_sub_service_pk,_ in db_connection.references(ConnectionSubServiceModel):
                db_connection_sub_service : ConnectionSubServiceModel = get_object(
                    self.database, ConnectionSubServiceModel, db_connection_sub_service_pk)
                db_connection_sub_service.delete()

            db_connection.delete()
            db_path.delete()

            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': dict_connection_id})
            return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetConnectionEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]:
        for message in self.messagebroker.consume({TOPIC_CONNECTION}, consume_timeout=CONSUME_TIMEOUT):
            yield ConnectionEvent(**json.loads(message.content))