Skip to content
Snippets Groups Projects
ContextServiceServicerImpl.py 44.3 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    import grpc, json, logging, operator, threading
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from typing import Iterator, List, Set, Tuple
    from common.message_broker.MessageBroker import MessageBroker
    
    from common.orm.Database import Database
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.orm.HighLevel import (
        get_all_objects, get_object, get_or_create_object, get_related_objects, update_or_create_object)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.orm.backend.Tools import key_to_str
    
    from common.proto.context_pb2 import (
        Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList,
        Context, ContextEvent, ContextId, ContextIdList, ContextList,
        Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList,
        Empty, EventTypeEnum,
        Link, LinkEvent, LinkId, LinkIdList, LinkList,
        Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList,
        Slice, SliceEvent, SliceId, SliceIdList, SliceList,
        Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList)
    from common.proto.context_pb2_grpc import ContextServiceServicer
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
    from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from context.service.database.ConfigModel import grpc_config_rules_to_raw, update_config
    from context.service.database.ConnectionModel import ConnectionModel, set_path
    from context.service.database.ConstraintModel import set_constraints
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from context.service.database.ContextModel import ContextModel
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from context.service.database.DeviceModel import DeviceModel, grpc_to_enum__device_operational_status, set_drivers
    from context.service.database.EndPointModel import EndPointModel, set_kpi_sample_types
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from context.service.database.Events import notify_event
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from context.service.database.LinkModel import LinkModel
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from context.service.database.RelationModels import (
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        ConnectionSubServiceModel, LinkEndPointModel, ServiceEndPointModel, SliceEndPointModel, SliceServiceModel,
        SliceSubSliceModel, TopologyDeviceModel, TopologyLinkModel)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from context.service.database.ServiceModel import (
        ServiceModel, grpc_to_enum__service_status, grpc_to_enum__service_type)
    
    from context.service.database.SliceModel import SliceModel, grpc_to_enum__slice_status
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from context.service.database.TopologyModel import TopologyModel
    
    from .Constants import (
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_SLICE,
        TOPIC_TOPOLOGY)
    
    
    LOGGER = logging.getLogger(__name__)
    
    
    SERVICE_NAME = 'Context'
    METHOD_NAMES = [
    
        'ListConnectionIds', 'ListConnections', 'GetConnection', 'SetConnection', 'RemoveConnection', 'GetConnectionEvents',
        'ListContextIds',    'ListContexts',    'GetContext',    'SetContext',    'RemoveContext',    'GetContextEvents',
        'ListTopologyIds',   'ListTopologies',  'GetTopology',   'SetTopology',   'RemoveTopology',   'GetTopologyEvents',
        'ListDeviceIds',     'ListDevices',     'GetDevice',     'SetDevice',     'RemoveDevice',     'GetDeviceEvents',
        'ListLinkIds',       'ListLinks',       'GetLink',       'SetLink',       'RemoveLink',       'GetLinkEvents',
        'ListServiceIds',    'ListServices',    'GetService',    'SetService',    'RemoveService',    'GetServiceEvents',
    
        'ListSliceIds',      'ListSlices',      'GetSlice',      'SetSlice',      'RemoveSlice',      'GetSliceEvents',
    
    ]
    METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
    
    class ContextServiceServicerImpl(ContextServiceServicer):
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def __init__(self, database : Database, messagebroker : MessageBroker):
    
            LOGGER.debug('Creating Servicer...')
    
            self.lock = threading.Lock()
    
            self.database = database
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            self.messagebroker = messagebroker
    
            LOGGER.debug('Servicer Created')
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
        # ----- Context ----------------------------------------------------------------------------------------------------
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList:
    
            with self.lock:
                db_contexts : List[ContextModel] = get_all_objects(self.database, ContextModel)
                db_contexts = sorted(db_contexts, key=operator.attrgetter('pk'))
                return ContextIdList(context_ids=[db_context.dump_id() for db_context in db_contexts])
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def ListContexts(self, request: Empty, context : grpc.ServicerContext) -> ContextList:
    
            with self.lock:
                db_contexts : List[ContextModel] = get_all_objects(self.database, ContextModel)
                db_contexts = sorted(db_contexts, key=operator.attrgetter('pk'))
                return ContextList(contexts=[db_context.dump() for db_context in db_contexts])
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context:
    
            with self.lock:
                context_uuid = request.context_uuid.uuid
                db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
                return Context(**db_context.dump(include_services=True, include_topologies=True))
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId:
    
            with self.lock:
                context_uuid = request.context_id.context_uuid.uuid
    
                for i,topology_id in enumerate(request.topology_ids):
                    topology_context_uuid = topology_id.context_id.context_uuid.uuid
                    if topology_context_uuid != context_uuid:
                        raise InvalidArgumentException(
                            'request.topology_ids[{:d}].context_id.context_uuid.uuid'.format(i), topology_context_uuid,
                            ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])
    
                for i,service_id in enumerate(request.service_ids):
                    service_context_uuid = service_id.context_id.context_uuid.uuid
                    if service_context_uuid != context_uuid:
                        raise InvalidArgumentException(
                            'request.service_ids[{:d}].context_id.context_uuid.uuid'.format(i), service_context_uuid,
                            ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])
    
                result : Tuple[ContextModel, bool] = update_or_create_object(
                    self.database, ContextModel, context_uuid, {'context_uuid': context_uuid})
                db_context, updated = result
    
                for i,topology_id in enumerate(request.topology_ids):
                    topology_context_uuid = topology_id.context_id.context_uuid.uuid
                    topology_uuid = topology_id.topology_uuid.uuid
                    get_object(self.database, TopologyModel, [context_uuid, topology_uuid]) # just to confirm it exists
    
                for i,service_id in enumerate(request.service_ids):
                    service_context_uuid = service_id.context_id.context_uuid.uuid
                    service_uuid = service_id.service_uuid.uuid
                    get_object(self.database, ServiceModel, [context_uuid, service_uuid]) # just to confirm it exists
    
                event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
                dict_context_id = db_context.dump_id()
                notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': dict_context_id})
                return ContextId(**dict_context_id)
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty:
    
            with self.lock:
                context_uuid = request.context_uuid.uuid
                db_context = ContextModel(self.database, context_uuid, auto_load=False)
                found = db_context.load()
                if not found: return Empty()
    
                dict_context_id = db_context.dump_id()
                db_context.delete()
                event_type = EventTypeEnum.EVENTTYPE_REMOVE
                notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': dict_context_id})
                return Empty()
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT):
                yield ContextEvent(**json.loads(message.content))
    
    
        # ----- Topology ---------------------------------------------------------------------------------------------------
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def ListTopologyIds(self, request: ContextId, context : grpc.ServicerContext) -> TopologyIdList:
    
            with self.lock:
                context_uuid = request.context_uuid.uuid
                db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
                db_topologies : Set[TopologyModel] = get_related_objects(db_context, TopologyModel)
                db_topologies = sorted(db_topologies, key=operator.attrgetter('pk'))
                return TopologyIdList(topology_ids=[db_topology.dump_id() for db_topology in db_topologies])
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def ListTopologies(self, request: ContextId, context : grpc.ServicerContext) -> TopologyList:
    
            with self.lock:
                context_uuid = request.context_uuid.uuid
                db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
                db_topologies : Set[TopologyModel] = get_related_objects(db_context, TopologyModel)
                db_topologies = sorted(db_topologies, key=operator.attrgetter('pk'))
                return TopologyList(topologies=[db_topology.dump() for db_topology in db_topologies])
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def GetTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Topology:
    
            with self.lock:
                str_key = key_to_str([request.context_id.context_uuid.uuid, request.topology_uuid.uuid])
                db_topology : TopologyModel = get_object(self.database, TopologyModel, str_key)
                return Topology(**db_topology.dump(include_devices=True, include_links=True))
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def SetTopology(self, request: Topology, context : grpc.ServicerContext) -> TopologyId:
    
            with self.lock:
                context_uuid = request.topology_id.context_id.context_uuid.uuid
                db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
    
                topology_uuid = request.topology_id.topology_uuid.uuid
                str_topology_key = key_to_str([context_uuid, topology_uuid])
                result : Tuple[TopologyModel, bool] = update_or_create_object(
    
                    self.database, TopologyModel, str_topology_key, {
                        'context_fk': db_context, 'topology_uuid': topology_uuid})
    
                db_topology,updated = result
    
                for device_id in request.device_ids:
                    device_uuid = device_id.device_uuid.uuid
                    db_device = get_object(self.database, DeviceModel, device_uuid)
                    str_topology_device_key = key_to_str([str_topology_key, device_uuid], separator='--')
                    result : Tuple[TopologyDeviceModel, bool] = update_or_create_object(
                        self.database, TopologyDeviceModel, str_topology_device_key,
                        {'topology_fk': db_topology, 'device_fk': db_device})
                    #db_topology_device,topology_device_updated = result
    
                for link_id in request.link_ids:
                    link_uuid = link_id.link_uuid.uuid
                    db_link = get_object(self.database, LinkModel, link_uuid)
    
                    str_topology_link_key = key_to_str([str_topology_key, link_uuid], separator='--')
                    result : Tuple[TopologyLinkModel, bool] = update_or_create_object(
                        self.database, TopologyLinkModel, str_topology_link_key,
                        {'topology_fk': db_topology, 'link_fk': db_link})
                    #db_topology_link,topology_link_updated = result
    
                event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
                dict_topology_id = db_topology.dump_id()
                notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': dict_topology_id})
                return TopologyId(**dict_topology_id)
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def RemoveTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Empty:
    
            with self.lock:
                context_uuid = request.context_id.context_uuid.uuid
                topology_uuid = request.topology_uuid.uuid
                db_topology = TopologyModel(self.database, key_to_str([context_uuid, topology_uuid]), auto_load=False)
                found = db_topology.load()
                if not found: return Empty()
    
                dict_topology_id = db_topology.dump_id()
                db_topology.delete()
                event_type = EventTypeEnum.EVENTTYPE_REMOVE
                notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': dict_topology_id})
                return Empty()
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def GetTopologyEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT):
                yield TopologyEvent(**json.loads(message.content))
    
    
        # ----- Device -----------------------------------------------------------------------------------------------------
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def ListDeviceIds(self, request: Empty, context : grpc.ServicerContext) -> DeviceIdList:
    
            with self.lock:
                db_devices : List[DeviceModel] = get_all_objects(self.database, DeviceModel)
                db_devices = sorted(db_devices, key=operator.attrgetter('pk'))
                return DeviceIdList(device_ids=[db_device.dump_id() for db_device in db_devices])
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def ListDevices(self, request: Empty, context : grpc.ServicerContext) -> DeviceList:
    
            with self.lock:
                db_devices : List[DeviceModel] = get_all_objects(self.database, DeviceModel)
                db_devices = sorted(db_devices, key=operator.attrgetter('pk'))
                return DeviceList(devices=[db_device.dump() for db_device in db_devices])
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def GetDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Device:
    
            with self.lock:
                device_uuid = request.device_uuid.uuid
                db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid)
                return Device(**db_device.dump(
                    include_config_rules=True, include_drivers=True, include_endpoints=True))
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def SetDevice(self, request: Device, context : grpc.ServicerContext) -> DeviceId:
    
            with self.lock:
                device_uuid = request.device_id.device_uuid.uuid
    
                for i,endpoint in enumerate(request.device_endpoints):
                    endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
                    if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
                    if device_uuid != endpoint_device_uuid:
                        raise InvalidArgumentException(
                            'request.device_endpoints[{:d}].device_id.device_uuid.uuid'.format(i), endpoint_device_uuid,
                            ['should be == {:s}({:s})'.format('request.device_id.device_uuid.uuid', device_uuid)])
    
                config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
                running_config_result = update_config(self.database, device_uuid, 'running', config_rules)
                db_running_config = running_config_result[0][0]
    
                result : Tuple[DeviceModel, bool] = update_or_create_object(self.database, DeviceModel, device_uuid, {
                    'device_uuid'              : device_uuid,
                    'device_type'              : request.device_type,
                    'device_operational_status': grpc_to_enum__device_operational_status(request.device_operational_status),
                    'device_config_fk'         : db_running_config,
                })
                db_device, updated = result
    
                set_drivers(self.database, db_device, request.device_drivers)
    
                for i,endpoint in enumerate(request.device_endpoints):
                    endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
                    endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
                    if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
    
                    str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
                    endpoint_attributes = {
                        'device_fk'    : db_device,
                        'endpoint_uuid': endpoint_uuid,
                        'endpoint_type': endpoint.endpoint_type,
                    }
    
                    endpoint_topology_context_uuid = endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid
                    endpoint_topology_uuid = endpoint.endpoint_id.topology_id.topology_uuid.uuid
                    if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
                        str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
                        db_topology : TopologyModel = get_object(self.database, TopologyModel, str_topology_key)
    
                        str_topology_device_key = key_to_str([str_topology_key, device_uuid], separator='--')
                        result : Tuple[TopologyDeviceModel, bool] = get_or_create_object(
                            self.database, TopologyDeviceModel, str_topology_device_key, {
                                'topology_fk': db_topology, 'device_fk': db_device})
                        #db_topology_device, topology_device_created = result
    
                        str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
                        endpoint_attributes['topology_fk'] = db_topology
    
                    result : Tuple[EndPointModel, bool] = update_or_create_object(
                        self.database, EndPointModel, str_endpoint_key, endpoint_attributes)
                    db_endpoint, endpoint_updated = result
    
                    set_kpi_sample_types(self.database, db_endpoint, endpoint.kpi_sample_types)
    
                event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
                dict_device_id = db_device.dump_id()
                notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id})
                return DeviceId(**dict_device_id)
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def RemoveDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Empty:
    
            with self.lock:
                device_uuid = request.device_uuid.uuid
                db_device = DeviceModel(self.database, device_uuid, auto_load=False)
                found = db_device.load()
                if not found: return Empty()
    
                dict_device_id = db_device.dump_id()
                db_device.delete()
    
                event_type = EventTypeEnum.EVENTTYPE_REMOVE
                notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id})
                return Empty()
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def GetDeviceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT):
                yield DeviceEvent(**json.loads(message.content))
    
    
        # ----- Link -------------------------------------------------------------------------------------------------------
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def ListLinkIds(self, request: Empty, context : grpc.ServicerContext) -> LinkIdList:
    
            with self.lock:
                db_links : List[LinkModel] = get_all_objects(self.database, LinkModel)
                db_links = sorted(db_links, key=operator.attrgetter('pk'))
                return LinkIdList(link_ids=[db_link.dump_id() for db_link in db_links])
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def ListLinks(self, request: Empty, context : grpc.ServicerContext) -> LinkList:
    
            with self.lock:
                db_links : List[LinkModel] = get_all_objects(self.database, LinkModel)
                db_links = sorted(db_links, key=operator.attrgetter('pk'))
                return LinkList(links=[db_link.dump() for db_link in db_links])
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def GetLink(self, request: LinkId, context : grpc.ServicerContext) -> Link:
    
            with self.lock:
                link_uuid = request.link_uuid.uuid
                db_link : LinkModel = get_object(self.database, LinkModel, link_uuid)
                return Link(**db_link.dump())
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def SetLink(self, request: Link, context : grpc.ServicerContext) -> LinkId:
    
            with self.lock:
                link_uuid = request.link_id.link_uuid.uuid
                result : Tuple[LinkModel, bool] = update_or_create_object(
                    self.database, LinkModel, link_uuid, {'link_uuid': link_uuid})
                db_link, updated = result
    
                for endpoint_id in request.link_endpoint_ids:
                    endpoint_uuid                  = endpoint_id.endpoint_uuid.uuid
                    endpoint_device_uuid           = endpoint_id.device_id.device_uuid.uuid
                    endpoint_topology_uuid         = endpoint_id.topology_id.topology_uuid.uuid
                    endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
    
                    str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid])
    
                    db_topology = None
                    if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
                        str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
                        db_topology : TopologyModel = get_object(self.database, TopologyModel, str_topology_key)
                        str_topology_device_key = key_to_str([str_topology_key, endpoint_device_uuid], separator='--')
    
                        # check device is in topology
                        get_object(self.database, TopologyDeviceModel, str_topology_device_key)
    
                        str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
    
                    db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key)
    
                    str_link_endpoint_key = key_to_str([link_uuid, endpoint_device_uuid], separator='--')
                    result : Tuple[LinkEndPointModel, bool] = get_or_create_object(
                        self.database, LinkEndPointModel, str_link_endpoint_key, {
                            'link_fk': db_link, 'endpoint_fk': db_endpoint})
                    #db_link_endpoint, link_endpoint_created = result
    
                    if db_topology is not None:
                        str_topology_link_key = key_to_str([str_topology_key, link_uuid], separator='--')
                        result : Tuple[TopologyLinkModel, bool] = get_or_create_object(
                            self.database, TopologyLinkModel, str_topology_link_key, {
                                'topology_fk': db_topology, 'link_fk': db_link})
                        #db_topology_link, topology_link_created = result
    
                event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
                dict_link_id = db_link.dump_id()
                notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': dict_link_id})
                return LinkId(**dict_link_id)
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def RemoveLink(self, request: LinkId, context : grpc.ServicerContext) -> Empty:
    
            with self.lock:
                link_uuid = request.link_uuid.uuid
                db_link = LinkModel(self.database, link_uuid, auto_load=False)
                found = db_link.load()
                if not found: return Empty()
    
                dict_link_id = db_link.dump_id()
                db_link.delete()
    
                event_type = EventTypeEnum.EVENTTYPE_REMOVE
                notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': dict_link_id})
                return Empty()
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def GetLinkEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            for message in self.messagebroker.consume({TOPIC_LINK}, consume_timeout=CONSUME_TIMEOUT):
                yield LinkEvent(**json.loads(message.content))
    
    
        # ----- Service ----------------------------------------------------------------------------------------------------
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def ListServiceIds(self, request: ContextId, context : grpc.ServicerContext) -> ServiceIdList:
    
            with self.lock:
                db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid)
                db_services : Set[ServiceModel] = get_related_objects(db_context, ServiceModel)
                db_services = sorted(db_services, key=operator.attrgetter('pk'))
                return ServiceIdList(service_ids=[db_service.dump_id() for db_service in db_services])
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def ListServices(self, request: ContextId, context : grpc.ServicerContext) -> ServiceList:
    
            with self.lock:
                db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid)
                db_services : Set[ServiceModel] = get_related_objects(db_context, ServiceModel)
                db_services = sorted(db_services, key=operator.attrgetter('pk'))
                return ServiceList(services=[db_service.dump() for db_service in db_services])
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def GetService(self, request: ServiceId, context : grpc.ServicerContext) -> Service:
    
            with self.lock:
                str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid])
                db_service : ServiceModel = get_object(self.database, ServiceModel, str_key)
                return Service(**db_service.dump(
                    include_endpoint_ids=True, include_constraints=True, include_config_rules=True))
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def SetService(self, request: Service, context : grpc.ServicerContext) -> ServiceId:
    
            with self.lock:
                context_uuid = request.service_id.context_id.context_uuid.uuid
                db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
    
                for i,endpoint_id in enumerate(request.service_endpoint_ids):
                    endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
                    if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid:
                        raise InvalidArgumentException(
                            'request.service_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i),
                            endpoint_topology_context_uuid,
    
                            ['should be == {:s}({:s})'.format(
                                'request.service_id.context_id.context_uuid.uuid', context_uuid)])
    
    
                service_uuid = request.service_id.service_uuid.uuid
                str_service_key = key_to_str([context_uuid, service_uuid])
    
                constraints_result = set_constraints(
                    self.database, str_service_key, 'constraints', request.service_constraints)
                db_constraints = constraints_result[0][0]
    
                config_rules = grpc_config_rules_to_raw(request.service_config.config_rules)
                running_config_result = update_config(self.database, str_service_key, 'running', config_rules)
                db_running_config = running_config_result[0][0]
    
                result : Tuple[ServiceModel, bool] = update_or_create_object(self.database, ServiceModel, str_service_key, {
                    'context_fk'            : db_context,
                    'service_uuid'          : service_uuid,
                    'service_type'          : grpc_to_enum__service_type(request.service_type),
                    'service_constraints_fk': db_constraints,
                    'service_status'        : grpc_to_enum__service_status(request.service_status.service_status),
                    'service_config_fk'     : db_running_config,
                })
                db_service, updated = result
    
                for i,endpoint_id in enumerate(request.service_endpoint_ids):
                    endpoint_uuid                  = endpoint_id.endpoint_uuid.uuid
                    endpoint_device_uuid           = endpoint_id.device_id.device_uuid.uuid
                    endpoint_topology_uuid         = endpoint_id.topology_id.topology_uuid.uuid
                    endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
    
                    str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid])
                    if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
                        str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
                        str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
    
                    db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key)
    
                    str_service_endpoint_key = key_to_str([service_uuid, str_endpoint_key], separator='--')
                    result : Tuple[ServiceEndPointModel, bool] = get_or_create_object(
                        self.database, ServiceEndPointModel, str_service_endpoint_key, {
                            'service_fk': db_service, 'endpoint_fk': db_endpoint})
                    #db_service_endpoint, service_endpoint_created = result
    
                event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
                dict_service_id = db_service.dump_id()
                notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id})
                return ServiceId(**dict_service_id)
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def RemoveService(self, request: ServiceId, context : grpc.ServicerContext) -> Empty:
    
            with self.lock:
                context_uuid = request.context_id.context_uuid.uuid
                service_uuid = request.service_uuid.uuid
                db_service = ServiceModel(self.database, key_to_str([context_uuid, service_uuid]), auto_load=False)
                found = db_service.load()
                if not found: return Empty()
    
                dict_service_id = db_service.dump_id()
                db_service.delete()
    
                event_type = EventTypeEnum.EVENTTYPE_REMOVE
                notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id})
                return Empty()
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def GetServiceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            for message in self.messagebroker.consume({TOPIC_SERVICE}, consume_timeout=CONSUME_TIMEOUT):
                yield ServiceEvent(**json.loads(message.content))
    
        # ----- Slice ----------------------------------------------------------------------------------------------------
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def ListSliceIds(self, request: ContextId, context : grpc.ServicerContext) -> SliceIdList:
            with self.lock:
                db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid)
                db_slices : Set[SliceModel] = get_related_objects(db_context, SliceModel)
                db_slices = sorted(db_slices, key=operator.attrgetter('pk'))
                return SliceIdList(slice_ids=[db_slice.dump_id() for db_slice in db_slices])
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def ListSlices(self, request: ContextId, context : grpc.ServicerContext) -> SliceList:
            with self.lock:
                db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid)
                db_slices : Set[SliceModel] = get_related_objects(db_context, SliceModel)
                db_slices = sorted(db_slices, key=operator.attrgetter('pk'))
                return SliceList(slices=[db_slice.dump() for db_slice in db_slices])
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def GetSlice(self, request: SliceId, context : grpc.ServicerContext) -> Slice:
            with self.lock:
                str_key = key_to_str([request.context_id.context_uuid.uuid, request.slice_uuid.uuid])
                db_slice : SliceModel = get_object(self.database, SliceModel, str_key)
                return Slice(**db_slice.dump(
    
                    include_endpoint_ids=True, include_constraints=True, include_config_rules=True,
                    include_service_ids=True, include_subslice_ids=True))
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def SetSlice(self, request: Slice, context : grpc.ServicerContext) -> SliceId:
            with self.lock:
                context_uuid = request.slice_id.context_id.context_uuid.uuid
                db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
    
                for i,endpoint_id in enumerate(request.slice_endpoint_ids):
                    endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
                    if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid:
                        raise InvalidArgumentException(
                            'request.slice_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i),
                            endpoint_topology_context_uuid,
                            ['should be == {:s}({:s})'.format(
                                'request.slice_id.context_id.context_uuid.uuid', context_uuid)])
    
                slice_uuid = request.slice_id.slice_uuid.uuid
                str_slice_key = key_to_str([context_uuid, slice_uuid])
    
                constraints_result = set_constraints(
                    self.database, str_slice_key, 'constraints', request.slice_constraints)
                db_constraints = constraints_result[0][0]
    
    
                config_rules = grpc_config_rules_to_raw(request.slice_config.config_rules)
                running_config_result = update_config(self.database, str_slice_key, 'running', config_rules)
                db_running_config = running_config_result[0][0]
    
    
                result : Tuple[SliceModel, bool] = update_or_create_object(self.database, SliceModel, str_slice_key, {
                    'context_fk'          : db_context,
                    'slice_uuid'          : slice_uuid,
                    'slice_constraints_fk': db_constraints,
                    'slice_status'        : grpc_to_enum__slice_status(request.slice_status.slice_status),
    
                    'slice_config_fk'     : db_running_config,
    
                })
                db_slice, updated = result
    
                for i,endpoint_id in enumerate(request.slice_endpoint_ids):
                    endpoint_uuid                  = endpoint_id.endpoint_uuid.uuid
                    endpoint_device_uuid           = endpoint_id.device_id.device_uuid.uuid
                    endpoint_topology_uuid         = endpoint_id.topology_id.topology_uuid.uuid
                    endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
    
                    str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid])
                    if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
                        str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
                        str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
    
                    db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key)
    
                    str_slice_endpoint_key = key_to_str([slice_uuid, str_endpoint_key], separator='--')
                    result : Tuple[SliceEndPointModel, bool] = get_or_create_object(
                        self.database, SliceEndPointModel, str_slice_endpoint_key, {
                            'slice_fk': db_slice, 'endpoint_fk': db_endpoint})
                    #db_slice_endpoint, slice_endpoint_created = result
    
                for i,service_id in enumerate(request.slice_service_ids):
                    service_uuid         = service_id.service_uuid.uuid
                    service_context_uuid = service_id.context_id.context_uuid.uuid
                    str_service_key = key_to_str([service_context_uuid, service_uuid])
                    db_service : ServiceModel = get_object(self.database, ServiceModel, str_service_key)
    
                    str_slice_service_key = key_to_str([str_slice_key, str_service_key], separator='--')
                    result : Tuple[SliceServiceModel, bool] = get_or_create_object(
                        self.database, SliceServiceModel, str_slice_service_key, {
                            'slice_fk': db_slice, 'service_fk': db_service})
                    #db_slice_service, slice_service_created = result
    
                for i,subslice_id in enumerate(request.slice_subslice_ids):
                    subslice_uuid         = subslice_id.slice_uuid.uuid
                    subslice_context_uuid = subslice_id.context_id.context_uuid.uuid
                    str_subslice_key = key_to_str([subslice_context_uuid, subslice_uuid])
                    db_subslice : SliceModel = get_object(self.database, SliceModel, str_subslice_key)
    
                    str_slice_subslice_key = key_to_str([str_slice_key, str_subslice_key], separator='--')
                    result : Tuple[SliceSubSliceModel, bool] = get_or_create_object(
                        self.database, SliceSubSliceModel, str_slice_subslice_key, {
                            'slice_fk': db_slice, 'sub_slice_fk': db_subslice})
                    #db_slice_subslice, slice_subslice_created = result
    
                event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
                dict_slice_id = db_slice.dump_id()
                notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': dict_slice_id})
                return SliceId(**dict_slice_id)
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def RemoveSlice(self, request: SliceId, context : grpc.ServicerContext) -> Empty:
            with self.lock:
                context_uuid = request.context_id.context_uuid.uuid
                slice_uuid = request.slice_uuid.uuid
                db_slice = SliceModel(self.database, key_to_str([context_uuid, slice_uuid]), auto_load=False)
                found = db_slice.load()
                if not found: return Empty()
    
                dict_slice_id = db_slice.dump_id()
                db_slice.delete()
    
                event_type = EventTypeEnum.EVENTTYPE_REMOVE
                notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': dict_slice_id})
                return Empty()
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def GetSliceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]:
            for message in self.messagebroker.consume({TOPIC_SLICE}, consume_timeout=CONSUME_TIMEOUT):
                yield SliceEvent(**json.loads(message.content))
    
    
    
        # ----- Connection -------------------------------------------------------------------------------------------------
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def ListConnectionIds(self, request: ServiceId, context : grpc.ServicerContext) -> ConnectionIdList:
    
            with self.lock:
                str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid])
                db_service : ServiceModel = get_object(self.database, ServiceModel, str_key)
                db_connections : Set[ConnectionModel] = get_related_objects(db_service, ConnectionModel)
                db_connections = sorted(db_connections, key=operator.attrgetter('pk'))
                return ConnectionIdList(connection_ids=[db_connection.dump_id() for db_connection in db_connections])
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def ListConnections(self, request: ContextId, context : grpc.ServicerContext) -> ServiceList:
    
            with self.lock:
                str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid])
                db_service : ServiceModel = get_object(self.database, ServiceModel, str_key)
                db_connections : Set[ConnectionModel] = get_related_objects(db_service, ConnectionModel)
                db_connections = sorted(db_connections, key=operator.attrgetter('pk'))
                return ConnectionList(connections=[db_connection.dump() for db_connection in db_connections])
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def GetConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Connection:
    
            with self.lock:
                db_connection : ConnectionModel = get_object(self.database, ConnectionModel, request.connection_uuid.uuid)
                return Connection(**db_connection.dump(include_path=True, include_sub_service_ids=True))
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def SetConnection(self, request: Connection, context : grpc.ServicerContext) -> ConnectionId:
    
            with self.lock:
                connection_uuid = request.connection_id.connection_uuid.uuid
    
                connection_attributes = {'connection_uuid': connection_uuid}
    
                service_context_uuid = request.service_id.context_id.context_uuid.uuid
                service_uuid = request.service_id.service_uuid.uuid
                if len(service_context_uuid) > 0 and len(service_uuid) > 0:
                    str_service_key = key_to_str([service_context_uuid, service_uuid])
                    db_service : ServiceModel = get_object(self.database, ServiceModel, str_service_key)
                    connection_attributes['service_fk'] = db_service
    
                path_hops_result = set_path(self.database, connection_uuid, request.path_hops_endpoint_ids, path_name = '')
                db_path = path_hops_result[0]
                connection_attributes['path_fk'] = db_path
    
                result : Tuple[ConnectionModel, bool] = update_or_create_object(
                    self.database, ConnectionModel, connection_uuid, connection_attributes)
                db_connection, updated = result
    
                for sub_service_id in request.sub_service_ids:
                    sub_service_uuid         = sub_service_id.service_uuid.uuid
                    sub_service_context_uuid = sub_service_id.context_id.context_uuid.uuid
                    str_sub_service_key = key_to_str([sub_service_context_uuid, sub_service_uuid])
                    db_service : ServiceModel = get_object(self.database, ServiceModel, str_sub_service_key)
    
                    str_connection_sub_service_key = key_to_str([connection_uuid, str_sub_service_key], separator='--')
                    result : Tuple[ConnectionSubServiceModel, bool] = get_or_create_object(
                        self.database, ConnectionSubServiceModel, str_connection_sub_service_key, {
                            'connection_fk': db_connection, 'sub_service_fk': db_service})
                    #db_connection_sub_service, connection_sub_service_created = result
    
                event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
                dict_connection_id = db_connection.dump_id()
                notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': dict_connection_id})
                return ConnectionId(**dict_connection_id)
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def RemoveConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Empty:
    
            with self.lock:
                db_connection = ConnectionModel(self.database, request.connection_uuid.uuid, auto_load=False)
                found = db_connection.load()
                if not found: return Empty()
    
                dict_connection_id = db_connection.dump_id()
                db_connection.delete()
    
                event_type = EventTypeEnum.EVENTTYPE_REMOVE
                notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': dict_connection_id})
                return Empty()
    
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def GetConnectionEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]:
            for message in self.messagebroker.consume({TOPIC_CONNECTION}, consume_timeout=CONSUME_TIMEOUT):
                yield ConnectionEvent(**json.loads(message.content))