Skip to content
Snippets Groups Projects
ContextServiceServicerImpl.py 48 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
import grpc, json, logging, operator, threading
from typing import Iterator, List, Set, Tuple, Union
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.message_broker.MessageBroker import MessageBroker
from context.service.Database import Database

from common.proto.context_pb2 import (
    Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList,
    Context, ContextEvent, ContextId, ContextIdList, ContextList,
    Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList,
    Empty, EventTypeEnum,
    Link, LinkEvent, LinkId, LinkIdList, LinkList,
    Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList,
    Slice, SliceEvent, SliceId, SliceIdList, SliceList,
    Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList,
    ConfigActionEnum)
from common.proto.context_pb2_grpc import ContextServiceServicer
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException
from sqlalchemy.orm import Session, contains_eager, selectinload
from common.rpc_method_wrapper.ServiceExceptions import NotFoundException
from context.service.database.ConfigModel import grpc_config_rules_to_raw
from context.service.database.DeviceModel import DeviceModel, grpc_to_enum__device_operational_status, set_drivers, grpc_to_enum__device_driver, DriverModel
from context.service.database.ConfigModel import ConfigModel, ORM_ConfigActionEnum, ConfigRuleModel
from common.orm.backend.Tools import key_to_str

from ..database.KpiSampleType import grpc_to_enum__kpi_sample_type
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.ConnectionModel import ConnectionModel, set_path
from context.service.database.ConstraintModel import set_constraints
from context.service.database.EndPointModel import EndPointModel, set_kpi_sample_types
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.Events import notify_event
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.LinkModel import LinkModel
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.RelationModels import (
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    ConnectionSubServiceModel, LinkEndPointModel, ServiceEndPointModel, SliceEndPointModel, SliceServiceModel,
    SliceSubSliceModel, TopologyDeviceModel, TopologyLinkModel)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.ServiceModel import (
    ServiceModel, grpc_to_enum__service_status, grpc_to_enum__service_type)
from context.service.database.SliceModel import SliceModel, grpc_to_enum__slice_status
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.TopologyModel import TopologyModel
"""
from context.service.database.ContextModel import ContextModel
from context.service.database.TopologyModel import TopologyModel
from context.service.database.Events import notify_event
from context.service.database.EndPointModel import EndPointModel
from context.service.database.EndPointModel import KpiSampleTypeModel
from .Constants import (
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_SLICE,
    TOPIC_TOPOLOGY)

LOGGER = logging.getLogger(__name__)

SERVICE_NAME = 'Context'
METHOD_NAMES = [
    'ListConnectionIds', 'ListConnections', 'GetConnection', 'SetConnection', 'RemoveConnection', 'GetConnectionEvents',
    'ListContextIds',    'ListContexts',    'GetContext',    'SetContext',    'RemoveContext',    'GetContextEvents',
    'ListTopologyIds',   'ListTopologies',  'GetTopology',   'SetTopology',   'RemoveTopology',   'GetTopologyEvents',
    'ListDeviceIds',     'ListDevices',     'GetDevice',     'SetDevice',     'RemoveDevice',     'GetDeviceEvents',
    'ListLinkIds',       'ListLinks',       'GetLink',       'SetLink',       'RemoveLink',       'GetLinkEvents',
    'ListServiceIds',    'ListServices',    'GetService',    'SetService',    'RemoveService',    'GetServiceEvents',
    'ListSliceIds',      'ListSlices',      'GetSlice',      'SetSlice',      'RemoveSlice',      'GetSliceEvents',
]
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
class ContextServiceServicerImpl(ContextServiceServicer):
    def __init__(self, session : Session, messagebroker : MessageBroker):
        LOGGER.debug('Creating Servicer...')
        self.lock = threading.Lock()
        self.session = session
        self.database = Database(session)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.messagebroker = messagebroker
        LOGGER.debug('Servicer Created')

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Context ----------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList:
        with self.session() as session:
            result = session.query(ContextModel).all()

        return ContextIdList(context_ids=[row.dump_id() for row in result])


    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListContexts(self, request: Empty, context : grpc.ServicerContext) -> ContextList:
        with self.session() as session:
            result = session.query(ContextModel).all()

        return ContextList(contexts=[row.dump() for row in result])


    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context:
        context_uuid = request.context_uuid.uuid
        with self.session() as session:
            result = session.query(ContextModel).filter_by(context_uuid=context_uuid).one_or_none()

        if not result:
            raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid)

        return Context(**result.dump())

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId:
        context_uuid = request.context_id.context_uuid.uuid
        for i, topology_id in enumerate(request.topology_ids):
            topology_context_uuid = topology_id.context_id.context_uuid.uuid
            if topology_context_uuid != context_uuid:
                raise InvalidArgumentException(
                    'request.topology_ids[{:d}].context_id.context_uuid.uuid'.format(i), topology_context_uuid,
                    ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])
        for i, service_id in enumerate(request.service_ids):
            service_context_uuid = service_id.context_id.context_uuid.uuid
            if service_context_uuid != context_uuid:
                raise InvalidArgumentException(
                    'request.service_ids[{:d}].context_id.context_uuid.uuid'.format(i), service_context_uuid,
                    ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])
        context_add = ContextModel(context_uuid=context_uuid)
        updated = True
        with self.session() as session:
            result = session.query(ContextModel).filter_by(context_uuid=context_uuid).all()
            if not result:
                updated = False
            session.merge(context_add)
            session.commit()


        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        dict_context_id = context_add.dump_id()
        notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': dict_context_id})
        return ContextId(**context_add.dump_id())

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty:
        context_uuid = request.context_uuid.uuid

        with self.session() as session:
            result = session.query(ContextModel).filter_by(context_uuid=context_uuid).one_or_none()
            if not result:
                return Empty()
            session.query(ContextModel).filter_by(context_uuid=context_uuid).delete()
            session.commit()
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': result.dump_id()})

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT):
            yield ContextEvent(**json.loads(message.content))


    # ----- Topology ---------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListTopologyIds(self, request: ContextId, context : grpc.ServicerContext) -> TopologyIdList:
        context_uuid = request.context_uuid.uuid
        with self.session() as session:
            result = session.query(ContextModel).options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none()
            if not result:
                raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid)

            db_topologies = result.topology
            return TopologyIdList(topology_ids=[db_topology.dump_id() for db_topology in db_topologies])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListTopologies(self, request: ContextId, context : grpc.ServicerContext) -> TopologyList:
        context_uuid = request.context_uuid.uuid

        with self.session() as session:
            result = session.query(ContextModel).options(selectinload(ContextModel.topology)).filter_by(
                context_uuid=context_uuid).one_or_none()
            if not result:
                raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid)
            db_topologies = result.topology
            return TopologyList(topologies=[db_topology.dump() for db_topology in db_topologies])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Topology:
        context_uuid = request.context_id.context_uuid.uuid
        topology_uuid = request.topology_uuid.uuid

        with self.session() as session:
            result = session.query(TopologyModel).join(TopologyModel.context).filter(TopologyModel.topology_uuid==topology_uuid).options(contains_eager(TopologyModel.context)).one_or_none()

            if not result:
                raise NotFoundException(TopologyModel.__name__.replace('Model', ''), topology_uuid)
            return Topology(**result.dump())

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetTopology(self, request: Topology, context : grpc.ServicerContext) -> TopologyId:
        context_uuid = request.topology_id.context_id.context_uuid.uuid
        topology_uuid = request.topology_id.topology_uuid.uuid
        with self.session() as session:
            topology_add = TopologyModel(topology_uuid=topology_uuid, context_uuid=context_uuid)
            updated = True
            result = session.query(TopologyModel).join(TopologyModel.context).filter(TopologyModel.topology_uuid==topology_uuid).one_or_none()
            if not result:
                updated = False
            session.merge(topology_add)
            session.commit()
            result = session.query(TopologyModel).join(TopologyModel.context).filter(TopologyModel.topology_uuid==topology_uuid).one_or_none()
            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            dict_topology_id = result.dump_id()
            notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': dict_topology_id})
            return TopologyId(**dict_topology_id)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Empty:
        context_uuid = request.context_id.context_uuid.uuid
        topology_uuid = request.topology_uuid.uuid

        with self.session() as session:
            result = session.query(TopologyModel).filter_by(topology_uuid=topology_uuid, context_uuid=context_uuid).one_or_none()
            if not result:
                return Empty()
            dict_topology_id = result.dump_id()

            session.query(TopologyModel).filter_by(topology_uuid=topology_uuid, context_uuid=context_uuid).delete()
            session.commit()
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': dict_topology_id})
            return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetTopologyEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT):
            yield TopologyEvent(**json.loads(message.content))


    # ----- Device -----------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListDeviceIds(self, request: Empty, context : grpc.ServicerContext) -> DeviceIdList:
        with self.session() as session:
            result = session.query(DeviceModel).all()
            return DeviceIdList(device_ids=[device.dump_id() for device in result])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListDevices(self, request: Empty, context : grpc.ServicerContext) -> DeviceList:
        with self.session() as session:
            result = session.query(DeviceModel).all()
            return DeviceList(devices=[device.dump_id() for device in result])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Device:
        device_uuid = request.device_uuid.uuid
        with self.session() as session:
            result = session.query(DeviceModel).filter(DeviceModel.device_uuid == device_uuid).one_or_none()
            if not result:
                raise NotFoundException(DeviceModel.__name__.replace('Model', ''), device_uuid)

            rd = result.dump()
            rt = Device(**rd)

            return rt

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetDevice(self, request: Device, context : grpc.ServicerContext) -> DeviceId:
        device_uuid = request.device_id.device_uuid.uuid
        for i,endpoint in enumerate(request.device_endpoints):
            endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
            if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
            if device_uuid != endpoint_device_uuid:
                raise InvalidArgumentException(
                    'request.device_endpoints[{:d}].device_id.device_uuid.uuid'.format(i), endpoint_device_uuid,
                    ['should be == {:s}({:s})'.format('request.device_id.device_uuid.uuid', device_uuid)])

        config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
        running_config_result = self.update_config(device_uuid, 'running', config_rules)
        db_running_config = running_config_result[0][0]
        config_uuid = db_running_config.config_uuid

        new_obj = DeviceModel(**{
            'device_uuid'               : device_uuid,
            'device_type'               : request.device_type,
            'device_operational_status' : grpc_to_enum__device_operational_status(request.device_operational_status),
            'device_config_uuid'        : config_uuid,
        })
        result: Tuple[DeviceModel, bool] = self.database.create_or_update(new_obj)
        db_device, updated = result

        self.set_drivers(db_device, request.device_drivers)

        for i,endpoint in enumerate(request.device_endpoints):
            endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
            endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
            if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid

            str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
            endpoint_attributes = {
                'device_uuid'    : db_device.device_uuid,
                'endpoint_uuid': endpoint_uuid,
                'endpoint_type': endpoint.endpoint_type,
            }

            endpoint_topology_context_uuid = endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid
            endpoint_topology_uuid = endpoint.endpoint_id.topology_id.topology_uuid.uuid
            if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
                str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])

                db_topology : TopologyModel = self.database.get_object(TopologyModel, endpoint_topology_uuid)

                str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
                endpoint_attributes['topology_uuid'] = db_topology.topology_uuid

            new_endpoint = EndPointModel(**endpoint_attributes)
            result : Tuple[EndPointModel, bool] = self.database.create_or_update(new_endpoint)
            db_endpoint, updated = result

            self.set_kpi_sample_types(db_endpoint, endpoint.kpi_sample_types)

        # event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        dict_device_id = db_device.dump_id()
        # notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id})

        return DeviceId(**dict_device_id)

    def set_kpi_sample_types(self, db_endpoint: EndPointModel, grpc_endpoint_kpi_sample_types):
        db_endpoint_pk = db_endpoint.endpoint_uuid
        for kpi_sample_type in grpc_endpoint_kpi_sample_types:
            orm_kpi_sample_type = grpc_to_enum__kpi_sample_type(kpi_sample_type)
            # str_endpoint_kpi_sample_type_key = key_to_str([db_endpoint_pk, orm_kpi_sample_type.name])
            data = {'endpoint_uuid': db_endpoint_pk,
                    'kpi_sample_type': orm_kpi_sample_type.name,
                    'kpi_uuid': str(uuid.uuid4())}
            db_endpoint_kpi_sample_type = KpiSampleTypeModel(**data)
            self.database.create(db_endpoint_kpi_sample_type)

    def set_drivers(self, db_device: DeviceModel, grpc_device_drivers):
        db_device_pk = db_device.device_uuid
        for driver in grpc_device_drivers:
            orm_driver = grpc_to_enum__device_driver(driver)
            str_device_driver_key = key_to_str([db_device_pk, orm_driver.name])
            driver_config = {
                "driver_uuid": str(uuid.uuid4()),
                "device_uuid": db_device_pk,
                "driver": orm_driver.name
            }
            db_device_driver = DriverModel(**driver_config)
            db_device_driver.device_fk = db_device
            db_device_driver.driver = orm_driver

            self.database.create_or_update(db_device_driver)

    def update_config(
            self, db_parent_pk: str, config_name: str,
            raw_config_rules: List[Tuple[ORM_ConfigActionEnum, str, str]]
    ) -> List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]]:

        str_config_key = key_to_str([db_parent_pk, config_name], separator=':')
        result = self.database.get_or_create(ConfigModel, db_parent_pk)
        db_config, created = result

        LOGGER.info('UPDATED-CONFIG: {}'.format(db_config.dump()))

        db_objects: List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]] = [(db_config, created)]

        for position, (action, resource_key, resource_value) in enumerate(raw_config_rules):
            if action == ORM_ConfigActionEnum.SET:
                result : Tuple[ConfigRuleModel, bool] = self.set_config_rule(
                    db_config, position, resource_key, resource_value)
                db_config_rule, updated = result
                db_objects.append((db_config_rule, updated))
            elif action == ORM_ConfigActionEnum.DELETE:
                self.delete_config_rule(db_config, resource_key)
            else:
                msg = 'Unsupported action({:s}) for resource_key({:s})/resource_value({:s})'
                raise AttributeError(
                    msg.format(str(ConfigActionEnum.Name(action)), str(resource_key), str(resource_value)))

        return db_objects

    def set_config_rule(self, db_config: ConfigModel, position: int, resource_key: str, resource_value: str,
    ):  # -> Tuple[ConfigRuleModel, bool]:

        from src.context.service.database.Tools import fast_hasher
        str_rule_key_hash = fast_hasher(resource_key)
        str_config_rule_key = key_to_str([db_config.config_uuid, str_rule_key_hash], separator=':')
        pk = str(uuid.uuid5(uuid.UUID('9566448d-e950-425e-b2ae-7ead656c7e47'), str_config_rule_key))
        data = {'config_rule_uuid': pk, 'config_uuid': db_config.config_uuid, 'position': position,
                'action': ORM_ConfigActionEnum.SET, 'key': resource_key, 'value': resource_value}
        to_add = ConfigRuleModel(**data)

        result, updated = self.database.create_or_update(to_add)
        return result, updated

    def delete_config_rule(
            self, db_config: ConfigModel, resource_key: str
    ) -> None:

        from src.context.service.database.Tools import fast_hasher
        str_rule_key_hash = fast_hasher(resource_key)
        str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':')

        db_config_rule = self.database.get_object(ConfigRuleModel, str_config_rule_key, raise_if_not_found=False)

        if db_config_rule is None:
            return
        db_config_rule.delete()

    def delete_all_config_rules(self, db_config: ConfigModel) -> None:

        db_config_rule_pks = db_config.references(ConfigRuleModel)
        for pk, _ in db_config_rule_pks: ConfigRuleModel(self.database, pk).delete()

        """
        for position, (action, resource_key, resource_value) in enumerate(raw_config_rules):
            if action == ORM_ConfigActionEnum.SET:
                result: Tuple[ConfigRuleModel, bool] = set_config_rule(
                    database, db_config, position, resource_key, resource_value)
                db_config_rule, updated = result
                db_objects.append((db_config_rule, updated))
            elif action == ORM_ConfigActionEnum.DELETE:
                delete_config_rule(database, db_config, resource_key)
            else:
                msg = 'Unsupported action({:s}) for resource_key({:s})/resource_value({:s})'
                raise AttributeError(
                    msg.format(str(ConfigActionEnum.Name(action)), str(resource_key), str(resource_value)))

        return db_objects
        """

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Empty:
        with self.lock:
            device_uuid = request.device_uuid.uuid
            db_device = DeviceModel(self.database, device_uuid, auto_load=False)
            found = db_device.load()
            if not found: return Empty()
            dict_device_id = db_device.dump_id()
            db_device.delete()
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id})
            return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetDeviceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT):
            yield DeviceEvent(**json.loads(message.content))


Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Link -------------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListLinkIds(self, request: Empty, context : grpc.ServicerContext) -> LinkIdList:
        with self.lock:
            db_links : List[LinkModel] = get_all_objects(self.database, LinkModel)
            db_links = sorted(db_links, key=operator.attrgetter('pk'))
            return LinkIdList(link_ids=[db_link.dump_id() for db_link in db_links])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListLinks(self, request: Empty, context : grpc.ServicerContext) -> LinkList:
        with self.lock:
            db_links : List[LinkModel] = get_all_objects(self.database, LinkModel)
            db_links = sorted(db_links, key=operator.attrgetter('pk'))
            return LinkList(links=[db_link.dump() for db_link in db_links])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetLink(self, request: LinkId, context : grpc.ServicerContext) -> Link:
        with self.lock:
            link_uuid = request.link_uuid.uuid
            db_link : LinkModel = get_object(self.database, LinkModel, link_uuid)
            return Link(**db_link.dump())

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetLink(self, request: Link, context : grpc.ServicerContext) -> LinkId:
        with self.lock:
            link_uuid = request.link_id.link_uuid.uuid
            result : Tuple[LinkModel, bool] = update_or_create_object(
                self.database, LinkModel, link_uuid, {'link_uuid': link_uuid})
            db_link, updated = result

            for endpoint_id in request.link_endpoint_ids:
                endpoint_uuid                  = endpoint_id.endpoint_uuid.uuid
                endpoint_device_uuid           = endpoint_id.device_id.device_uuid.uuid
                endpoint_topology_uuid         = endpoint_id.topology_id.topology_uuid.uuid
                endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid

                str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid])

                db_topology = None
                if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
                    str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
                    db_topology : TopologyModel = get_object(self.database, TopologyModel, str_topology_key)
                    str_topology_device_key = key_to_str([str_topology_key, endpoint_device_uuid], separator='--')
                    # check device is in topology
                    get_object(self.database, TopologyDeviceModel, str_topology_device_key)
                    str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')

                db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key)

                str_link_endpoint_key = key_to_str([link_uuid, endpoint_device_uuid], separator='--')
                result : Tuple[LinkEndPointModel, bool] = get_or_create_object(
                    self.database, LinkEndPointModel, str_link_endpoint_key, {
                        'link_fk': db_link, 'endpoint_fk': db_endpoint})
                #db_link_endpoint, link_endpoint_created = result

                if db_topology is not None:
                    str_topology_link_key = key_to_str([str_topology_key, link_uuid], separator='--')
                    result : Tuple[TopologyLinkModel, bool] = get_or_create_object(
                        self.database, TopologyLinkModel, str_topology_link_key, {
                            'topology_fk': db_topology, 'link_fk': db_link})
                    #db_topology_link, topology_link_created = result

            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            dict_link_id = db_link.dump_id()
            notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': dict_link_id})
            return LinkId(**dict_link_id)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveLink(self, request: LinkId, context : grpc.ServicerContext) -> Empty:
        with self.lock:
            link_uuid = request.link_uuid.uuid
            db_link = LinkModel(self.database, link_uuid, auto_load=False)
            found = db_link.load()
            if not found: return Empty()
            dict_link_id = db_link.dump_id()
            db_link.delete()
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': dict_link_id})
            return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetLinkEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for message in self.messagebroker.consume({TOPIC_LINK}, consume_timeout=CONSUME_TIMEOUT):
            yield LinkEvent(**json.loads(message.content))


    # ----- Service ----------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListServiceIds(self, request: ContextId, context : grpc.ServicerContext) -> ServiceIdList:
        with self.lock:
            db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid)
            db_services : Set[ServiceModel] = get_related_objects(db_context, ServiceModel)
            db_services = sorted(db_services, key=operator.attrgetter('pk'))
            return ServiceIdList(service_ids=[db_service.dump_id() for db_service in db_services])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListServices(self, request: ContextId, context : grpc.ServicerContext) -> ServiceList:
        with self.lock:
            db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid)
            db_services : Set[ServiceModel] = get_related_objects(db_context, ServiceModel)
            db_services = sorted(db_services, key=operator.attrgetter('pk'))
            return ServiceList(services=[db_service.dump() for db_service in db_services])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetService(self, request: ServiceId, context : grpc.ServicerContext) -> Service:
        with self.lock:
            str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid])
            db_service : ServiceModel = get_object(self.database, ServiceModel, str_key)
            return Service(**db_service.dump(
                include_endpoint_ids=True, include_constraints=True, include_config_rules=True))

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetService(self, request: Service, context : grpc.ServicerContext) -> ServiceId:
        with self.lock:
            context_uuid = request.service_id.context_id.context_uuid.uuid
            db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)

            for i,endpoint_id in enumerate(request.service_endpoint_ids):
                endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
                if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid:
                    raise InvalidArgumentException(
                        'request.service_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i),
                        endpoint_topology_context_uuid,
                        ['should be == {:s}({:s})'.format(
                            'request.service_id.context_id.context_uuid.uuid', context_uuid)])

            service_uuid = request.service_id.service_uuid.uuid
            str_service_key = key_to_str([context_uuid, service_uuid])

            constraints_result = set_constraints(
                self.database, str_service_key, 'constraints', request.service_constraints)
            db_constraints = constraints_result[0][0]

            config_rules = grpc_config_rules_to_raw(request.service_config.config_rules)
            running_config_result = update_config(self.database, str_service_key, 'running', config_rules)
            db_running_config = running_config_result[0][0]

            result : Tuple[ServiceModel, bool] = update_or_create_object(self.database, ServiceModel, str_service_key, {
                'context_fk'            : db_context,
                'service_uuid'          : service_uuid,
                'service_type'          : grpc_to_enum__service_type(request.service_type),
                'service_constraints_fk': db_constraints,
                'service_status'        : grpc_to_enum__service_status(request.service_status.service_status),
                'service_config_fk'     : db_running_config,
            })
            db_service, updated = result

            for i,endpoint_id in enumerate(request.service_endpoint_ids):
                endpoint_uuid                  = endpoint_id.endpoint_uuid.uuid
                endpoint_device_uuid           = endpoint_id.device_id.device_uuid.uuid
                endpoint_topology_uuid         = endpoint_id.topology_id.topology_uuid.uuid
                endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid

                str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid])
                if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
                    str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
                    str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')

                db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key)

                str_service_endpoint_key = key_to_str([service_uuid, str_endpoint_key], separator='--')
                result : Tuple[ServiceEndPointModel, bool] = get_or_create_object(
                    self.database, ServiceEndPointModel, str_service_endpoint_key, {
                        'service_fk': db_service, 'endpoint_fk': db_endpoint})
                #db_service_endpoint, service_endpoint_created = result

            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            dict_service_id = db_service.dump_id()
            notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id})
            return ServiceId(**dict_service_id)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveService(self, request: ServiceId, context : grpc.ServicerContext) -> Empty:
        with self.lock:
            context_uuid = request.context_id.context_uuid.uuid
            service_uuid = request.service_uuid.uuid
            db_service = ServiceModel(self.database, key_to_str([context_uuid, service_uuid]), auto_load=False)
            found = db_service.load()
            if not found: return Empty()
            dict_service_id = db_service.dump_id()
            db_service.delete()
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id})
            return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetServiceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for message in self.messagebroker.consume({TOPIC_SERVICE}, consume_timeout=CONSUME_TIMEOUT):
            yield ServiceEvent(**json.loads(message.content))
    # ----- Slice ----------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListSliceIds(self, request: ContextId, context : grpc.ServicerContext) -> SliceIdList:
        with self.lock:
            db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid)
            db_slices : Set[SliceModel] = get_related_objects(db_context, SliceModel)
            db_slices = sorted(db_slices, key=operator.attrgetter('pk'))
            return SliceIdList(slice_ids=[db_slice.dump_id() for db_slice in db_slices])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListSlices(self, request: ContextId, context : grpc.ServicerContext) -> SliceList:
        with self.lock:
            db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid)
            db_slices : Set[SliceModel] = get_related_objects(db_context, SliceModel)
            db_slices = sorted(db_slices, key=operator.attrgetter('pk'))
            return SliceList(slices=[db_slice.dump() for db_slice in db_slices])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetSlice(self, request: SliceId, context : grpc.ServicerContext) -> Slice:
        with self.lock:
            str_key = key_to_str([request.context_id.context_uuid.uuid, request.slice_uuid.uuid])
            db_slice : SliceModel = get_object(self.database, SliceModel, str_key)
            return Slice(**db_slice.dump(
                include_endpoint_ids=True, include_constraints=True, include_config_rules=True,
                include_service_ids=True, include_subslice_ids=True))

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetSlice(self, request: Slice, context : grpc.ServicerContext) -> SliceId:
        with self.lock:
            context_uuid = request.slice_id.context_id.context_uuid.uuid
            db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)

            for i,endpoint_id in enumerate(request.slice_endpoint_ids):
                endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
                if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid:
                    raise InvalidArgumentException(
                        'request.slice_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i),
                        endpoint_topology_context_uuid,
                        ['should be == {:s}({:s})'.format(
                            'request.slice_id.context_id.context_uuid.uuid', context_uuid)])

            slice_uuid = request.slice_id.slice_uuid.uuid
            str_slice_key = key_to_str([context_uuid, slice_uuid])

            constraints_result = set_constraints(
                self.database, str_slice_key, 'constraints', request.slice_constraints)
            db_constraints = constraints_result[0][0]

            config_rules = grpc_config_rules_to_raw(request.slice_config.config_rules)
            running_config_result = update_config(self.database, str_slice_key, 'running', config_rules)
            db_running_config = running_config_result[0][0]

            result : Tuple[SliceModel, bool] = update_or_create_object(self.database, SliceModel, str_slice_key, {
                'context_fk'          : db_context,
                'slice_uuid'          : slice_uuid,
                'slice_constraints_fk': db_constraints,
                'slice_status'        : grpc_to_enum__slice_status(request.slice_status.slice_status),
                'slice_config_fk'     : db_running_config,
            })
            db_slice, updated = result

            for i,endpoint_id in enumerate(request.slice_endpoint_ids):
                endpoint_uuid                  = endpoint_id.endpoint_uuid.uuid
                endpoint_device_uuid           = endpoint_id.device_id.device_uuid.uuid
                endpoint_topology_uuid         = endpoint_id.topology_id.topology_uuid.uuid
                endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid

                str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid])
                if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
                    str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
                    str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')

                db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key)

                str_slice_endpoint_key = key_to_str([slice_uuid, str_endpoint_key], separator='--')
                result : Tuple[SliceEndPointModel, bool] = get_or_create_object(
                    self.database, SliceEndPointModel, str_slice_endpoint_key, {
                        'slice_fk': db_slice, 'endpoint_fk': db_endpoint})
                #db_slice_endpoint, slice_endpoint_created = result

            for i,service_id in enumerate(request.slice_service_ids):
                service_uuid         = service_id.service_uuid.uuid
                service_context_uuid = service_id.context_id.context_uuid.uuid
                str_service_key = key_to_str([service_context_uuid, service_uuid])
                db_service : ServiceModel = get_object(self.database, ServiceModel, str_service_key)

                str_slice_service_key = key_to_str([str_slice_key, str_service_key], separator='--')
                result : Tuple[SliceServiceModel, bool] = get_or_create_object(
                    self.database, SliceServiceModel, str_slice_service_key, {
                        'slice_fk': db_slice, 'service_fk': db_service})
                #db_slice_service, slice_service_created = result

            for i,subslice_id in enumerate(request.slice_subslice_ids):
                subslice_uuid         = subslice_id.slice_uuid.uuid
                subslice_context_uuid = subslice_id.context_id.context_uuid.uuid
                str_subslice_key = key_to_str([subslice_context_uuid, subslice_uuid])
                db_subslice : SliceModel = get_object(self.database, SliceModel, str_subslice_key)

                str_slice_subslice_key = key_to_str([str_slice_key, str_subslice_key], separator='--')
                result : Tuple[SliceSubSliceModel, bool] = get_or_create_object(
                    self.database, SliceSubSliceModel, str_slice_subslice_key, {
                        'slice_fk': db_slice, 'sub_slice_fk': db_subslice})
                #db_slice_subslice, slice_subslice_created = result

            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            dict_slice_id = db_slice.dump_id()
            notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': dict_slice_id})
            return SliceId(**dict_slice_id)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveSlice(self, request: SliceId, context : grpc.ServicerContext) -> Empty:
        with self.lock:
            context_uuid = request.context_id.context_uuid.uuid
            slice_uuid = request.slice_uuid.uuid
            db_slice = SliceModel(self.database, key_to_str([context_uuid, slice_uuid]), auto_load=False)
            found = db_slice.load()
            if not found: return Empty()

            dict_slice_id = db_slice.dump_id()
            db_slice.delete()

            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': dict_slice_id})
            return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetSliceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]:
        for message in self.messagebroker.consume({TOPIC_SLICE}, consume_timeout=CONSUME_TIMEOUT):
            yield SliceEvent(**json.loads(message.content))


    # ----- Connection -------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListConnectionIds(self, request: ServiceId, context : grpc.ServicerContext) -> ConnectionIdList:
        with self.lock:
            str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid])
            db_service : ServiceModel = get_object(self.database, ServiceModel, str_key)
            db_connections : Set[ConnectionModel] = get_related_objects(db_service, ConnectionModel)
            db_connections = sorted(db_connections, key=operator.attrgetter('pk'))
            return ConnectionIdList(connection_ids=[db_connection.dump_id() for db_connection in db_connections])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListConnections(self, request: ContextId, context : grpc.ServicerContext) -> ServiceList:
        with self.lock:
            str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid])
            db_service : ServiceModel = get_object(self.database, ServiceModel, str_key)
            db_connections : Set[ConnectionModel] = get_related_objects(db_service, ConnectionModel)
            db_connections = sorted(db_connections, key=operator.attrgetter('pk'))
            return ConnectionList(connections=[db_connection.dump() for db_connection in db_connections])

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Connection:
        with self.lock:
            db_connection : ConnectionModel = get_object(self.database, ConnectionModel, request.connection_uuid.uuid)
            return Connection(**db_connection.dump(include_path=True, include_sub_service_ids=True))

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetConnection(self, request: Connection, context : grpc.ServicerContext) -> ConnectionId:
        with self.lock:
            connection_uuid = request.connection_id.connection_uuid.uuid
            connection_attributes = {'connection_uuid': connection_uuid}
            service_context_uuid = request.service_id.context_id.context_uuid.uuid
            service_uuid = request.service_id.service_uuid.uuid
            if len(service_context_uuid) > 0 and len(service_uuid) > 0:
                str_service_key = key_to_str([service_context_uuid, service_uuid])
                db_service : ServiceModel = get_object(self.database, ServiceModel, str_service_key)
                connection_attributes['service_fk'] = db_service
            path_hops_result = set_path(self.database, connection_uuid, request.path_hops_endpoint_ids, path_name = '')
            db_path = path_hops_result[0]
            connection_attributes['path_fk'] = db_path
            result : Tuple[ConnectionModel, bool] = update_or_create_object(
                self.database, ConnectionModel, connection_uuid, connection_attributes)
            db_connection, updated = result
            for sub_service_id in request.sub_service_ids:
                sub_service_uuid         = sub_service_id.service_uuid.uuid
                sub_service_context_uuid = sub_service_id.context_id.context_uuid.uuid
                str_sub_service_key = key_to_str([sub_service_context_uuid, sub_service_uuid])
                db_service : ServiceModel = get_object(self.database, ServiceModel, str_sub_service_key)
                str_connection_sub_service_key = key_to_str([connection_uuid, str_sub_service_key], separator='--')
                result : Tuple[ConnectionSubServiceModel, bool] = get_or_create_object(
                    self.database, ConnectionSubServiceModel, str_connection_sub_service_key, {
                        'connection_fk': db_connection, 'sub_service_fk': db_service})
                #db_connection_sub_service, connection_sub_service_created = result
            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            dict_connection_id = db_connection.dump_id()
            notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': dict_connection_id})
            return ConnectionId(**dict_connection_id)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Empty:
        with self.lock:
            db_connection = ConnectionModel(self.database, request.connection_uuid.uuid, auto_load=False)
            found = db_connection.load()
            if not found: return Empty()

            dict_connection_id = db_connection.dump_id()
            db_connection.delete()

            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': dict_connection_id})
            return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetConnectionEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]:
        for message in self.messagebroker.consume({TOPIC_CONNECTION}, consume_timeout=CONSUME_TIMEOUT):
            yield ConnectionEvent(**json.loads(message.content))