Skip to content
Snippets Groups Projects
ContextServiceServicerImpl.py 30.2 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import grpc, json, logging, operator
from typing import Iterator, List, Set, Tuple
from common.message_broker.MessageBroker import MessageBroker
from common.orm.Database import Database
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.orm.HighLevel import (
    get_all_objects, get_object, get_or_create_object, get_related_objects, update_or_create_object)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.orm.backend.Tools import key_to_str
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.proto.context_pb2 import (
    Context, ContextEvent, ContextId, ContextIdList, ContextList, Device, DeviceEvent, DeviceId, DeviceIdList,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    DeviceList, Empty, EventTypeEnum, Link, LinkEvent, LinkId, LinkIdList, LinkList, Service, ServiceEvent, ServiceId,
    ServiceIdList, ServiceList, Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList)
from context.proto.context_pb2_grpc import ContextServiceServicer
from context.service.database.ConfigModel import ConfigModel, ConfigRuleModel, grpc_config_rules_to_raw, update_config
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.ConstraintModel import ConstraintModel, ConstraintsModel, set_constraints
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.ContextModel import ContextModel
from context.service.database.DeviceModel import (
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    DeviceModel, DriverModel, grpc_to_enum__device_operational_status, set_drivers)
from context.service.database.EndPointModel import EndPointModel, KpiSampleTypeModel, set_kpi_sample_types
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.Events import notify_event
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.LinkModel import LinkModel
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.RelationModels import (
    LinkEndPointModel, ServiceEndPointModel, TopologyDeviceModel, TopologyLinkModel)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.ServiceModel import (
    ServiceModel, grpc_to_enum__service_status, grpc_to_enum__service_type)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.TopologyModel import TopologyModel
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.grpc_server.Constants import (
    CONSUME_TIMEOUT, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_TOPOLOGY)

LOGGER = logging.getLogger(__name__)

SERVICE_NAME = 'Context'
METHOD_NAMES = [
    'ListContextIds',  'ListContexts',   'GetContext',  'SetContext',  'RemoveContext',  'GetContextEvents',
    'ListTopologyIds', 'ListTopologies', 'GetTopology', 'SetTopology', 'RemoveTopology', 'GetTopologyEvents',
    'ListDeviceIds',   'ListDevices',    'GetDevice',   'SetDevice',   'RemoveDevice',   'GetDeviceEvents',
    'ListLinkIds',     'ListLinks',      'GetLink',     'SetLink',     'RemoveLink',     'GetLinkEvents',
    'ListServiceIds',  'ListServices',   'GetService',  'SetService',  'RemoveService',  'GetServiceEvents',
]
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
class ContextServiceServicerImpl(ContextServiceServicer):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def __init__(self, database : Database, messagebroker : MessageBroker):
        LOGGER.debug('Creating Servicer...')
        self.database = database
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.messagebroker = messagebroker
        LOGGER.debug('Servicer Created')

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Context ----------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_contexts : List[ContextModel] = get_all_objects(self.database, ContextModel)
        db_contexts = sorted(db_contexts, key=operator.attrgetter('pk'))
        return ContextIdList(context_ids=[db_context.dump_id() for db_context in db_contexts])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListContexts(self, request: Empty, context : grpc.ServicerContext) -> ContextList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_contexts : List[ContextModel] = get_all_objects(self.database, ContextModel)
        db_contexts = sorted(db_contexts, key=operator.attrgetter('pk'))
        return ContextList(contexts=[db_context.dump() for db_context in db_contexts])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_uuid = request.context_uuid.uuid
        db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
        return Context(**db_context.dump(include_services=True, include_topologies=True))

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_uuid = request.context_id.context_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        for i,topology_id in enumerate(request.topology_ids):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            topology_context_uuid = topology_id.context_id.context_uuid.uuid
            if topology_context_uuid != context_uuid:
                raise InvalidArgumentException(
                    'request.topology_ids[{:d}].context_id.context_uuid.uuid'.format(i), topology_context_uuid,
                    ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        for i,service_id in enumerate(request.service_ids):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            service_context_uuid = service_id.context_id.context_uuid.uuid
            if service_context_uuid != context_uuid:
                raise InvalidArgumentException(
                    'request.service_ids[{:d}].context_id.context_uuid.uuid'.format(i), service_context_uuid,
                    ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        result : Tuple[ContextModel, bool] = update_or_create_object(
            self.database, ContextModel, context_uuid, {'context_uuid': context_uuid})
        db_context, updated = result

        for i,topology_id in enumerate(request.topology_ids):
            topology_context_uuid = topology_id.context_id.context_uuid.uuid
            topology_uuid = topology_id.topology_uuid.uuid
            get_object(self.database, TopologyModel, [context_uuid, topology_uuid]) # just to confirm it exists

        for i,service_id in enumerate(request.service_ids):
            service_context_uuid = service_id.context_id.context_uuid.uuid
            service_uuid = service_id.service_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            get_object(self.database, ServiceModel, [context_uuid, service_uuid]) # just to confirm it exists

        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        dict_context_id = db_context.dump_id()
        notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': dict_context_id})
        return ContextId(**dict_context_id)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_uuid = request.context_uuid.uuid
        db_context = ContextModel(self.database, context_uuid, auto_load=False)
        found = db_context.load()
        if not found: return Empty()

        dict_context_id = db_context.dump_id()
        db_context.delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        event_type = EventTypeEnum.EVENTTYPE_REMOVE
        notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': dict_context_id})

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT):
            yield ContextEvent(**json.loads(message.content))


    # ----- Topology ---------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListTopologyIds(self, request: ContextId, context : grpc.ServicerContext) -> TopologyIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_uuid = request.context_uuid.uuid
        db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
        db_topologies : Set[TopologyModel] = get_related_objects(db_context, TopologyModel)
        db_topologies = sorted(db_topologies, key=operator.attrgetter('pk'))
        return TopologyIdList(topology_ids=[db_topology.dump_id() for db_topology in db_topologies])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListTopologies(self, request: ContextId, context : grpc.ServicerContext) -> TopologyList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_uuid = request.context_uuid.uuid
        db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
        db_topologies : Set[TopologyModel] = get_related_objects(db_context, TopologyModel)
        db_topologies = sorted(db_topologies, key=operator.attrgetter('pk'))
        return TopologyList(topologies=[db_topology.dump() for db_topology in db_topologies])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Topology:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        str_key = key_to_str([request.context_id.context_uuid.uuid, request.topology_uuid.uuid])
        db_topology : TopologyModel = get_object(self.database, TopologyModel, str_key)
        return Topology(**db_topology.dump(include_devices=True, include_links=True))

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetTopology(self, request: Topology, context : grpc.ServicerContext) -> TopologyId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_uuid = request.topology_id.context_id.context_uuid.uuid
        db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        topology_uuid = request.topology_id.topology_uuid.uuid
        str_topology_key = key_to_str([context_uuid, topology_uuid])
        result : Tuple[TopologyModel, bool] = update_or_create_object(
            self.database, TopologyModel, str_topology_key, {'context_fk': db_context, 'topology_uuid': topology_uuid})
        db_topology,updated = result
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for device_id in request.device_ids:
            device_uuid = device_id.device_uuid.uuid
            db_device = get_object(self.database, DeviceModel, device_uuid)
            str_topology_device_key = key_to_str([str_topology_key, device_uuid], separator='--')
            result : Tuple[TopologyDeviceModel, bool] = update_or_create_object(
                self.database, TopologyDeviceModel, str_topology_device_key,
                {'topology_fk': db_topology, 'device_fk': db_device})
            #db_topology_device,topology_device_updated = result
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        for link_id in request.link_ids:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            link_uuid = link_id.link_uuid.uuid
            db_link = get_object(self.database, LinkModel, link_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            str_topology_link_key = key_to_str([str_topology_key, link_uuid], separator='--')
            result : Tuple[TopologyLinkModel, bool] = update_or_create_object(
                self.database, TopologyLinkModel, str_topology_link_key,
                {'topology_fk': db_topology, 'link_fk': db_link})
            #db_topology_link,topology_link_updated = result
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        dict_topology_id = db_topology.dump_id()
        notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': dict_topology_id})
        return TopologyId(**dict_topology_id)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_uuid = request.context_id.context_uuid.uuid
        topology_uuid = request.topology_uuid.uuid
        db_topology = TopologyModel(self.database, key_to_str([context_uuid, topology_uuid]), auto_load=False)
        found = db_topology.load()
        if not found: return Empty()

        dict_topology_id = db_topology.dump_id()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_topology.delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        event_type = EventTypeEnum.EVENTTYPE_REMOVE
        notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': dict_topology_id})
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetTopologyEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT):
            yield TopologyEvent(**json.loads(message.content))


    # ----- Device -----------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListDeviceIds(self, request: Empty, context : grpc.ServicerContext) -> DeviceIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_devices : List[DeviceModel] = get_all_objects(self.database, DeviceModel)
        db_devices = sorted(db_devices, key=operator.attrgetter('pk'))
        return DeviceIdList(device_ids=[db_device.dump_id() for db_device in db_devices])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListDevices(self, request: Empty, context : grpc.ServicerContext) -> DeviceList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_devices : List[DeviceModel] = get_all_objects(self.database, DeviceModel)
        db_devices = sorted(db_devices, key=operator.attrgetter('pk'))
        return DeviceList(devices=[db_device.dump() for db_device in db_devices])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Device:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        device_uuid = request.device_uuid.uuid
        db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Device(**db_device.dump(
            include_config_rules=True, include_drivers=True, include_endpoints=True))

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetDevice(self, request: Device, context : grpc.ServicerContext) -> DeviceId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        device_uuid = request.device_id.device_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for i,endpoint in enumerate(request.device_endpoints):
            endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
            if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
            if device_uuid != endpoint_device_uuid:
                raise InvalidArgumentException(
                    'request.device_endpoints[{:d}].device_id.device_uuid.uuid'.format(i), endpoint_device_uuid,
                    ['should be == {:s}({:s})'.format('request.device_id.device_uuid.uuid', device_uuid)])

        config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
        running_config_result = update_config(self.database, device_uuid, 'running', config_rules)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_running_config = running_config_result[0][0]

        result : Tuple[DeviceModel, bool] = update_or_create_object(self.database, DeviceModel, device_uuid, {
            'device_uuid'              : device_uuid,
            'device_type'              : request.device_type,
            'device_operational_status': grpc_to_enum__device_operational_status(request.device_operational_status),
            'device_config_fk'         : db_running_config,
        })
        db_device, updated = result

        set_drivers(self.database, db_device, request.device_drivers)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        for i,endpoint in enumerate(request.device_endpoints):
            endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
            endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
            if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

            str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
            endpoint_attributes = {
                'device_fk'    : db_device,
                'endpoint_uuid': endpoint_uuid,
                'endpoint_type': endpoint.endpoint_type,
            }
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

            endpoint_topology_context_uuid = endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid
            endpoint_topology_uuid = endpoint.endpoint_id.topology_id.topology_uuid.uuid
            if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
                str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                db_topology : TopologyModel = get_object(self.database, TopologyModel, str_topology_key)

                str_topology_device_key = key_to_str([str_topology_key, device_uuid], separator='--')
                result : Tuple[TopologyDeviceModel, bool] = get_or_create_object(
                    self.database, TopologyDeviceModel, str_topology_device_key, {
                        'topology_fk': db_topology, 'device_fk': db_device})
                #db_topology_device, topology_device_created = result
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
                endpoint_attributes['topology_fk'] = db_topology

            result : Tuple[EndPointModel, bool] = update_or_create_object(
                self.database, EndPointModel, str_endpoint_key, endpoint_attributes)
            db_endpoint, endpoint_updated = result

            set_kpi_sample_types(self.database, db_endpoint, endpoint.kpi_sample_types)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        dict_device_id = db_device.dump_id()
        notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id})
        return DeviceId(**dict_device_id)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        device_uuid = request.device_uuid.uuid
        db_device = DeviceModel(self.database, device_uuid, auto_load=False)
        found = db_device.load()
        if not found: return Empty()

        dict_device_id = db_device.dump_id()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        for db_endpoint_pk,_ in db_device.references(EndPointModel):
            db_endpoint = EndPointModel(self.database, db_endpoint_pk)
            for db_kpi_sample_type_pk,_ in db_endpoint.references(KpiSampleTypeModel):
                KpiSampleTypeModel(self.database, db_kpi_sample_type_pk).delete()
            db_endpoint.delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        for db_topology_device_pk,_ in db_device.references(TopologyDeviceModel):
            TopologyDeviceModel(self.database, db_topology_device_pk).delete()

        for db_driver_pk,_ in db_device.references(DriverModel):
            DriverModel(self.database, db_driver_pk).delete()

        db_config = ConfigModel(self.database, db_device.device_config_fk)
        for db_config_rule_pk,_ in db_config.references(ConfigRuleModel):
            ConfigRuleModel(self.database, db_config_rule_pk).delete()

        db_device.delete()
        db_config.delete()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        event_type = EventTypeEnum.EVENTTYPE_REMOVE
        notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id})
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetDeviceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT):
            yield DeviceEvent(**json.loads(message.content))


    # ----- Link -------------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListLinkIds(self, request: Empty, context : grpc.ServicerContext) -> LinkIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_links : List[LinkModel] = get_all_objects(self.database, LinkModel)
        db_links = sorted(db_links, key=operator.attrgetter('pk'))
        return LinkIdList(link_ids=[db_link.dump_id() for db_link in db_links])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListLinks(self, request: Empty, context : grpc.ServicerContext) -> LinkList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_links : List[LinkModel] = get_all_objects(self.database, LinkModel)
        db_links = sorted(db_links, key=operator.attrgetter('pk'))
        return LinkList(links=[db_link.dump() for db_link in db_links])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetLink(self, request: LinkId, context : grpc.ServicerContext) -> Link:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        link_uuid = request.link_uuid.uuid
        db_link : LinkModel = get_object(self.database, LinkModel, link_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Link(**db_link.dump())

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetLink(self, request: Link, context : grpc.ServicerContext) -> LinkId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        link_uuid = request.link_id.link_uuid.uuid
        result : Tuple[LinkModel, bool] = update_or_create_object(
            self.database, LinkModel, link_uuid, {'link_uuid': link_uuid})
        db_link, updated = result
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for endpoint_id in request.link_endpoint_ids:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            endpoint_uuid                  = endpoint_id.endpoint_uuid.uuid
            endpoint_device_uuid           = endpoint_id.device_id.device_uuid.uuid
            endpoint_topology_uuid         = endpoint_id.topology_id.topology_uuid.uuid
            endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

            db_topology = None
            if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
                str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                db_topology : TopologyModel = get_object(self.database, TopologyModel, str_topology_key)
                str_topology_device_key = key_to_str([str_topology_key, endpoint_device_uuid], separator='--')
                get_object(self.database, TopologyDeviceModel, str_topology_device_key) # check device is in topology
                str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')

            db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            str_link_endpoint_key = key_to_str([link_uuid, endpoint_device_uuid], separator='--')
            result : Tuple[LinkEndPointModel, bool] = get_or_create_object(
                self.database, LinkEndPointModel, str_link_endpoint_key, {
                    'link_fk': db_link, 'endpoint_fk': db_endpoint})
            #db_link_endpoint, link_endpoint_created = result

            if db_topology is not None:
                str_topology_link_key = key_to_str([str_topology_key, link_uuid], separator='--')
                result : Tuple[TopologyLinkModel, bool] = get_or_create_object(
                    self.database, TopologyLinkModel, str_topology_link_key, {
                        'topology_fk': db_topology, 'link_fk': db_link})
                #db_topology_link, topology_link_created = result
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        dict_link_id = db_link.dump_id()
        notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': dict_link_id})
        return LinkId(**dict_link_id)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveLink(self, request: LinkId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        link_uuid = request.link_uuid.uuid
        db_link = LinkModel(self.database, link_uuid, auto_load=False)
        found = db_link.load()
        if not found: return Empty()

        dict_link_id = db_link.dump_id()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        for db_link_endpoint_pk,_ in db_link.references(LinkEndPointModel):
            LinkEndPointModel(self.database, db_link_endpoint_pk).delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for db_topology_link_pk,_ in db_link.references(TopologyLinkModel):
            TopologyLinkModel(self.database, db_topology_link_pk).delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        db_link.delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        event_type = EventTypeEnum.EVENTTYPE_REMOVE
        notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': dict_link_id})
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetLinkEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for message in self.messagebroker.consume({TOPIC_LINK}, consume_timeout=CONSUME_TIMEOUT):
            yield LinkEvent(**json.loads(message.content))


    # ----- Service ----------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListServiceIds(self, request: ContextId, context : grpc.ServicerContext) -> ServiceIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid)
        db_services : Set[ServiceModel] = get_related_objects(db_context, ServiceModel)
        db_services = sorted(db_services, key=operator.attrgetter('pk'))
        return ServiceIdList(service_ids=[db_service.dump_id() for db_service in db_services])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListServices(self, request: ContextId, context : grpc.ServicerContext) -> ServiceList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid)
        db_services : Set[ServiceModel] = get_related_objects(db_context, ServiceModel)
        db_services = sorted(db_services, key=operator.attrgetter('pk'))
        return ServiceList(services=[db_service.dump() for db_service in db_services])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetService(self, request: ServiceId, context : grpc.ServicerContext) -> Service:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid])
        db_service : ServiceModel = get_object(self.database, ServiceModel, str_key)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Service(**db_service.dump(
            include_endpoint_ids=True, include_constraints=True, include_config_rules=True))

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetService(self, request: Service, context : grpc.ServicerContext) -> ServiceId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_uuid = request.service_id.context_id.context_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for i,endpoint_id in enumerate(request.service_endpoint_ids):
            endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
            if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid:
                raise InvalidArgumentException(
                    'request.service_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i),
                    endpoint_topology_context_uuid,
                    ['should be == {:s}({:s})'.format('request.service_id.context_id.context_uuid.uuid', context_uuid)])

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        service_uuid = request.service_id.service_uuid.uuid
        str_service_key = key_to_str([context_uuid, service_uuid])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        constraints_result = set_constraints(
            self.database, str_service_key, 'constraints', request.service_constraints)
        db_constraints = constraints_result[0][0]
        config_rules = grpc_config_rules_to_raw(request.service_config.config_rules)
        running_config_result = update_config(self.database, str_service_key, 'running', config_rules)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_running_config = running_config_result[0][0]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        result : Tuple[ServiceModel, bool] = update_or_create_object(self.database, ServiceModel, str_service_key, {
            'context_fk'            : db_context,
            'service_uuid'          : service_uuid,
            'service_type'          : grpc_to_enum__service_type(request.service_type),
            'service_constraints_fk': db_constraints,
            'service_status'        : grpc_to_enum__service_status(request.service_status.service_status),
            'service_config_fk'     : db_running_config,
        })
        db_service, updated = result

        for i,endpoint_id in enumerate(request.service_endpoint_ids):
            endpoint_uuid                  = endpoint_id.endpoint_uuid.uuid
            endpoint_device_uuid           = endpoint_id.device_id.device_uuid.uuid
            endpoint_topology_uuid         = endpoint_id.topology_id.topology_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

            str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
                str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')

            db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key)

            str_service_endpoint_key = key_to_str([service_uuid, endpoint_device_uuid], separator='--')
            result : Tuple[ServiceEndPointModel, bool] = get_or_create_object(
                self.database, ServiceEndPointModel, str_service_endpoint_key, {
                    'service_fk': db_service, 'endpoint_fk': db_endpoint})
            #db_service_endpoint, service_endpoint_created = result

        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        dict_service_id = db_service.dump_id()
        notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id})
        return ServiceId(**dict_service_id)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveService(self, request: ServiceId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_uuid = request.context_id.context_uuid.uuid
        service_uuid = request.service_uuid.uuid
        db_service = ServiceModel(self.database, key_to_str([context_uuid, service_uuid]), auto_load=False)
        found = db_service.load()
        if not found: return Empty()

        dict_service_id = db_service.dump_id()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        for db_service_endpoint_pk,_ in db_service.references(ServiceEndPointModel):
            ServiceEndPointModel(self.database, db_service_endpoint_pk).delete()

        db_config = ConfigModel(self.database, db_service.service_config_fk)
        for db_config_rule_pk,_ in db_config.references(ConfigRuleModel):
            ConfigRuleModel(self.database, db_config_rule_pk).delete()

        db_constraints = ConstraintsModel(self.database, db_service.service_constraints_fk)
        for db_constraint_pk,_ in db_constraints.references(ConstraintModel):
            ConstraintModel(self.database, db_constraint_pk).delete()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_service.delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_config.delete()
        db_constraints.delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        event_type = EventTypeEnum.EVENTTYPE_REMOVE
        notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id})
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetServiceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for message in self.messagebroker.consume({TOPIC_SERVICE}, consume_timeout=CONSUME_TIMEOUT):
            yield ServiceEvent(**json.loads(message.content))