Skip to content
Snippets Groups Projects
ContextServiceServicerImpl.py 64.6 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import grpc, json, logging, operator, sqlalchemy, threading, time, uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from sqlalchemy.orm import Session, contains_eager, selectinload, sessionmaker
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from sqlalchemy.dialects.postgresql import UUID, insert
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from sqlalchemy_cockroachdb import run_transaction
from typing import Dict, Iterator, List, Optional, Set, Tuple, Union
from common.message_broker.MessageBroker import MessageBroker
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#from common.orm.backend.Tools import key_to_str
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.context_pb2 import (
    Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList,
    Context, ContextEvent, ContextId, ContextIdList, ContextList,
    Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList,
    Empty, EventTypeEnum,
    Link, LinkEvent, LinkId, LinkIdList, LinkList,
    Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList,
    Slice, SliceEvent, SliceId, SliceIdList, SliceList,
    Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList,
    ConfigActionEnum, Constraint)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.context_pb2_grpc import ContextServiceServicer
from common.proto.context_policy_pb2_grpc import ContextPolicyServiceServicer
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.object_factory.Context import json_context_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.rpc_method_wrapper.ServiceExceptions import (
    InvalidArgumentException, NotFoundException, OperationFailedException)
#from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
#from context.service.Database import Database
#from context.service.database.ConfigModel import (
#    ConfigModel, ORM_ConfigActionEnum, ConfigRuleModel, grpc_config_rules_to_raw, update_config)
#from context.service.database.ConnectionModel import ConnectionModel, set_path
#from context.service.database.ConstraintModel import (
#    ConstraintModel, ConstraintsModel, Union_ConstraintModel, CONSTRAINT_PARSERS, set_constraints)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.ContextModel import ContextModel
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.DeviceModel import (
    DeviceModel, grpc_to_enum__device_operational_status, grpc_to_enum__device_driver)
from context.service.database.EndPointModel import EndPointModel, grpc_to_enum__kpi_sample_type
#from context.service.database.EndPointModel import EndPointModel, set_kpi_sample_types
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#from context.service.database.Events import notify_event
#from context.service.database.LinkModel import LinkModel
#from context.service.database.PolicyRuleModel import PolicyRuleModel
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.RelationModels import TopologyDeviceModel
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    ConnectionSubServiceModel, LinkEndPointModel, ServiceEndPointModel, SliceEndPointModel, SliceServiceModel,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    SliceSubSliceModel, TopologyLinkModel)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#from context.service.database.ServiceModel import (
#    ServiceModel, grpc_to_enum__service_status, grpc_to_enum__service_type)
#from context.service.database.SliceModel import SliceModel, grpc_to_enum__slice_status
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.TopologyModel import TopologyModel
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#from .Constants import (
#    CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_SLICE,
#    TOPIC_TOPOLOGY)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#from .ChangeFeedClient import ChangeFeedClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

LOGGER = logging.getLogger(__name__)

SERVICE_NAME = 'Context'
METHOD_NAMES = [
    'ListConnectionIds', 'ListConnections', 'GetConnection', 'SetConnection', 'RemoveConnection', 'GetConnectionEvents',
    'ListContextIds',    'ListContexts',    'GetContext',    'SetContext',    'RemoveContext',    'GetContextEvents',
    'ListTopologyIds',   'ListTopologies',  'GetTopology',   'SetTopology',   'RemoveTopology',   'GetTopologyEvents',
    'ListDeviceIds',     'ListDevices',     'GetDevice',     'SetDevice',     'RemoveDevice',     'GetDeviceEvents',
    'ListLinkIds',       'ListLinks',       'GetLink',       'SetLink',       'RemoveLink',       'GetLinkEvents',
    'ListServiceIds',    'ListServices',    'GetService',    'SetService',    'RemoveService',    'GetServiceEvents',
    'ListSliceIds',      'ListSlices',      'GetSlice',      'SetSlice',      'RemoveSlice',      'GetSliceEvents',
    'ListPolicyRuleIds', 'ListPolicyRules', 'GetPolicyRule', 'SetPolicyRule', 'RemovePolicyRule',
    'UnsetService',      'UnsetSlice',
]
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)

class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceServicer):
    def __init__(self, db_engine : sqlalchemy.engine.Engine, messagebroker : MessageBroker) -> None:
        LOGGER.debug('Creating Servicer...')
        self.db_engine = db_engine
        #self.lock = threading.Lock()
        #session = sessionmaker(bind=db_engine, expire_on_commit=False)
        #self.session = session
        #self.database = Database(session)
        self.messagebroker = messagebroker
        LOGGER.debug('Servicer Created')

    # ----- Context ----------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListContextIds(self, request : Empty, context : grpc.ServicerContext) -> ContextIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        def callback(session : Session) -> List[Dict]:
            obj_list : List[ContextModel] = session.query(ContextModel).all()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            #.options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            return [obj.dump_id() for obj in obj_list]
        return ContextIdList(context_ids=run_transaction(sessionmaker(bind=self.db_engine), callback))

    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListContexts(self, request : Empty, context : grpc.ServicerContext) -> ContextList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        def callback(session : Session) -> List[Dict]:
            obj_list : List[ContextModel] = session.query(ContextModel).all()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            #.options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            return [obj.dump() for obj in obj_list]
        return ContextList(contexts=run_transaction(sessionmaker(bind=self.db_engine), callback))

    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetContext(self, request : ContextId, context : grpc.ServicerContext) -> Context:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_uuid = request.context_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        def callback(session : Session) -> Optional[Dict]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            obj : Optional[ContextModel] = session.query(ContextModel)\
                .filter_by(context_uuid=context_uuid).one_or_none()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            return None if obj is None else obj.dump()
        obj = run_transaction(sessionmaker(bind=self.db_engine), callback)
        if obj is None: raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid)
        return Context(**obj)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def SetContext(self, request : Context, context : grpc.ServicerContext) -> ContextId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_uuid = request.context_id.context_uuid.uuid
        context_name = request.name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        for i, topology_id in enumerate(request.topology_ids):
            topology_context_uuid = topology_id.context_id.context_uuid.uuid
            if topology_context_uuid != context_uuid:
                raise InvalidArgumentException(
                    'request.topology_ids[{:d}].context_id.context_uuid.uuid'.format(i), topology_context_uuid,
                    ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])

        for i, service_id in enumerate(request.service_ids):
            service_context_uuid = service_id.context_id.context_uuid.uuid
            if service_context_uuid != context_uuid:
                raise InvalidArgumentException(
                    'request.service_ids[{:d}].context_id.context_uuid.uuid'.format(i), service_context_uuid,
                    ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for i, slice_id in enumerate(request.slice_ids):
            slice_context_uuid = slice_id.context_id.context_uuid.uuid
            if slice_context_uuid != context_uuid:
                raise InvalidArgumentException(
                    'request.slice_ids[{:d}].context_id.context_uuid.uuid'.format(i), slice_context_uuid,
                    ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        def callback(session : Session) -> Tuple[Optional[Dict], bool]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            obj : Optional[ContextModel] = session.query(ContextModel).with_for_update()\
                .filter_by(context_uuid=context_uuid).one_or_none()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            is_update = obj is not None
            if is_update:
                obj.context_name = context_name
                session.merge(obj)
            else:
                session.add(ContextModel(context_uuid=context_uuid, context_name=context_name, created_at=time.time()))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            obj : Optional[ContextModel] = session.query(ContextModel)\
                .filter_by(context_uuid=context_uuid).one_or_none()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            return (None if obj is None else obj.dump_id()), is_update
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        obj_id,updated = run_transaction(sessionmaker(bind=self.db_engine), callback)
        if obj_id is None: raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid)

        #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        #notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': obj_id})
        return ContextId(**obj_id)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def RemoveContext(self, request : ContextId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_uuid = request.context_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        def callback(session : Session) -> bool:
            num_deleted = session.query(ContextModel).filter_by(context_uuid=context_uuid).delete()
            return num_deleted > 0

        deleted = run_transaction(sessionmaker(bind=self.db_engine), callback)
        #if deleted:
        #    notify_event(self.messagebroker, TOPIC_CONTEXT, EventTypeEnum.EVENTTYPE_REMOVE, {'context_id': request})
        return Empty()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetContextEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        pass
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT):
        #    yield ContextEvent(**json.loads(message.content))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #cf = ChangeFeedClient()
        #ready = cf.initialize()
        #if not ready: raise OperationFailedException('Initialize ChangeFeed')
        #for timestamp, _, primary_key, is_delete, after in cf.get_changes('context'):
        #    if is_delete:
        #        event_type = EventTypeEnum.EVENTTYPE_REMOVE
        #    else:
        #        is_create = (timestamp - after.get('created_at')) < 1.0
        #        event_type = EventTypeEnum.EVENTTYPE_CREATE if is_create else EventTypeEnum.EVENTTYPE_UPDATE
        #    event = {
        #        'event': {'timestamp': {'timestamp': timestamp}, 'event_type': event_type},
        #        'context_id': json_context_id(primary_key[0]),
        #    }
        #    yield ContextEvent(**event)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Topology ---------------------------------------------------------------------------------------------------

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListTopologyIds(self, request : ContextId, context : grpc.ServicerContext) -> TopologyIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_uuid = request.context_uuid.uuid
        def callback(session : Session) -> List[Dict]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            obj_list : List[TopologyModel] = session.query(TopologyModel).filter_by(context_uuid=context_uuid).all()
            #.options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            return [obj.dump_id() for obj in obj_list]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return TopologyIdList(topology_ids=run_transaction(sessionmaker(bind=self.db_engine), callback))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListTopologies(self, request : ContextId, context : grpc.ServicerContext) -> TopologyList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_uuid = request.context_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        def callback(session : Session) -> List[Dict]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            obj_list : List[TopologyModel] = session.query(TopologyModel).filter_by(context_uuid=context_uuid).all()
            #.options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            return [obj.dump() for obj in obj_list]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return TopologyList(topologies=run_transaction(sessionmaker(bind=self.db_engine), callback))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetTopology(self, request : TopologyId, context : grpc.ServicerContext) -> Topology:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_uuid = request.context_id.context_uuid.uuid
        topology_uuid = request.topology_uuid.uuid

        def callback(session : Session) -> Optional[Dict]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            obj : Optional[TopologyModel] = session.query(TopologyModel)\
                .filter_by(context_uuid=context_uuid, topology_uuid=topology_uuid).one_or_none()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            return None if obj is None else obj.dump()
        obj = run_transaction(sessionmaker(bind=self.db_engine), callback)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if obj is None:
            obj_uuid = '{:s}/{:s}'.format(context_uuid, topology_uuid)
            raise NotFoundException(TopologyModel.__name__.replace('Model', ''), obj_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Topology(**obj)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetTopology(self, request : Topology, context : grpc.ServicerContext) -> TopologyId:
        context_uuid = request.topology_id.context_id.context_uuid.uuid
        topology_uuid = request.topology_id.topology_uuid.uuid
        topology_name = request.name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        devices_to_add : List[str] = [
            {'context_uuid': context_uuid, 'topology_uuid': topology_uuid, 'device_uuid': device_id.device_uuid.uuid}
            for device_id in request.device_ids
        ]
        links_to_add : List[str] = [
            {'context_uuid': context_uuid, 'topology_uuid': topology_uuid, 'link_uuid': link_id.link_uuid.uuid}
            for link_id in request.link_ids
        ]
        print('devices_to_add', devices_to_add)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        def callback(session : Session) -> Tuple[Optional[Dict], bool]:
            topology_data = [{
                'context_uuid' : context_uuid,
                'topology_uuid': topology_uuid,
                'topology_name': topology_name,
                'created_at'   : time.time(),
            }]
            stmt = insert(TopologyModel).values(topology_data)
            stmt = stmt.on_conflict_do_update(
                index_elements=[TopologyModel.context_uuid, TopologyModel.topology_uuid],
                set_=dict(topology_name = stmt.excluded.topology_name)
            )
            session.execute(stmt)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if len(devices_to_add) > 0:
                session.execute(insert(TopologyDeviceModel).values(devices_to_add).on_conflict_do_nothing(
                    index_elements=[
                        TopologyDeviceModel.context_uuid, TopologyDeviceModel.topology_uuid,
                        TopologyDeviceModel.device_uuid
                    ]
                ))

            #if len(link_to_add) > 0:
            #    session.execute(insert(TopologyLinkModel).values(link_to_add).on_conflict_do_nothing(
            #        index_elements=[
            #            TopologyLinkModel.context_uuid, TopologyLinkModel.topology_uuid,
            #            TopologyLinkModel.link_uuid
            #        ]
            #    ))

            is_update = True
            obj : Optional[TopologyModel] = session.query(TopologyModel)\
                .filter_by(context_uuid=context_uuid, topology_uuid=topology_uuid).one_or_none()
            return (None if obj is None else obj.dump_id()), is_update

        obj_id,updated = run_transaction(sessionmaker(bind=self.db_engine), callback)
        if obj_id is None: raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid)

        #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        #notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': obj_id})
        return TopologyId(**obj_id)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveTopology(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
        context_uuid = request.context_id.context_uuid.uuid
        topology_uuid = request.topology_uuid.uuid

        def callback(session : Session) -> bool:
            num_deleted = session.query(TopologyModel)\
                .filter_by(context_uuid=context_uuid, topology_uuid=topology_uuid).delete()
            return num_deleted > 0

        deleted = run_transaction(sessionmaker(bind=self.db_engine), callback)
        #if deleted:
        #    notify_event(self.messagebroker, TOPIC_TOPOLOGY, EventTypeEnum.EVENTTYPE_REMOVE, {'topology_id': request})
        return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetTopologyEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]:
        pass
        #for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT):
        #    yield TopologyEvent(**json.loads(message.content))

    # ----- Device -----------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListDeviceIds(self, request : Empty, context : grpc.ServicerContext) -> DeviceIdList:
        def callback(session : Session) -> List[Dict]:
            obj_list : List[DeviceModel] = session.query(DeviceModel).all()
            #.options(selectinload(DeviceModel.topology)).filter_by(context_uuid=context_uuid).one_or_none()
            return [obj.dump_id() for obj in obj_list]
        return DeviceIdList(device_ids=run_transaction(sessionmaker(bind=self.db_engine), callback))

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListDevices(self, request : Empty, context : grpc.ServicerContext) -> DeviceList:
        def callback(session : Session) -> List[Dict]:
            obj_list : List[DeviceModel] = session.query(DeviceModel).all()
            #.options(selectinload(DeviceModel.topology)).filter_by(context_uuid=context_uuid).one_or_none()
            return [obj.dump() for obj in obj_list]
        return DeviceList(devices=run_transaction(sessionmaker(bind=self.db_engine), callback))

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetDevice(self, request : ContextId, context : grpc.ServicerContext) -> Device:
        device_uuid = request.device_uuid.uuid
        def callback(session : Session) -> Optional[Dict]:
            obj : Optional[DeviceModel] = session.query(DeviceModel)\
                .filter_by(device_uuid=device_uuid).one_or_none()
            return None if obj is None else obj.dump()
        obj = run_transaction(sessionmaker(bind=self.db_engine), callback)
        if obj is None: raise NotFoundException(DeviceModel.__name__.replace('Model', ''), device_uuid)
        return Device(**obj)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
        device_uuid = request.device_id.device_uuid.uuid
        device_name = request.name
        device_type = request.device_type
        oper_status = grpc_to_enum__device_operational_status(request.device_operational_status)
        device_drivers = [grpc_to_enum__device_driver(d) for d in request.device_drivers]

        related_topology_uuids : Set[Tuple[str, str]] = set()
        endpoints_data : List[Dict] = list()
        for i, endpoint in enumerate(request.device_endpoints):
            endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
            if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
            if device_uuid != endpoint_device_uuid:
                raise InvalidArgumentException(
                    'request.device_endpoints[{:d}].device_id.device_uuid.uuid'.format(i), endpoint_device_uuid,
                    ['should be == {:s}({:s})'.format('request.device_id.device_uuid.uuid', device_uuid)])

            endpoint_context_uuid = endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid
            endpoint_topology_uuid = endpoint.endpoint_id.topology_id.topology_uuid.uuid

            kpi_sample_types = [grpc_to_enum__kpi_sample_type(kst) for kst in endpoint.kpi_sample_types]

            endpoints_data.append({
                'context_uuid'    : endpoint_context_uuid,
                'topology_uuid'   : endpoint_topology_uuid,
                'device_uuid'     : endpoint_device_uuid,
                'endpoint_uuid'   : endpoint.endpoint_id.endpoint_uuid.uuid,
                'endpoint_type'   : endpoint.endpoint_type,
                'kpi_sample_types': kpi_sample_types,
            })

            if len(endpoint_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
                related_topology_uuids.add({
                    'context_uuid': endpoint_context_uuid,
                    'topology_uuid': endpoint_topology_uuid,
                    'device_uuid': endpoint_device_uuid,
                })

        def callback(session : Session) -> Tuple[Optional[Dict], bool]:
            obj : Optional[DeviceModel] = session.query(DeviceModel).with_for_update()\
                .filter_by(device_uuid=device_uuid).one_or_none()
            is_update = obj is not None
            if is_update:
                obj.device_name = device_name
                obj.device_type = device_type
                obj.device_operational_status = oper_status
                obj.device_drivers = device_drivers
                session.merge(obj)
            else:
                session.add(DeviceModel(
                    device_uuid=device_uuid, device_name=device_name, device_type=device_type,
                    device_operational_status=oper_status, device_drivers=device_drivers, created_at=time.time()))
            obj : Optional[DeviceModel] = session.query(DeviceModel)\
                .filter_by(device_uuid=device_uuid).one_or_none()

            stmt = insert(EndPointModel).values(endpoints_data)
            stmt = stmt.on_conflict_do_update(
                index_elements=[
                    EndPointModel.context_uuid, EndPointModel.topology_uuid, EndPointModel.device_uuid,
                    EndPointModel.endpoint_uuid
                ],
                set_=dict(
                    endpoint_type = stmt.excluded.endpoint_type,
                    kpi_sample_types = stmt.excluded.kpi_sample_types,
                )
            )
            session.execute(stmt)

            session.execute(insert(TopologyDeviceModel).values(list(related_topology_uuids)).on_conflict_do_nothing(
                index_elements=[
                    TopologyDeviceModel.context_uuid, TopologyDeviceModel.topology_uuid,
                    TopologyDeviceModel.device_uuid
                ]
            ))

            return (None if obj is None else obj.dump_id()), is_update

        obj_id,updated = run_transaction(sessionmaker(bind=self.db_engine), callback)
        if obj_id is None: raise NotFoundException(DeviceModel.__name__.replace('Model', ''), device_uuid)

        #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        #notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': obj_id})
        return DeviceId(**obj_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#        with self.session() as session:
#            config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
#            running_config_result = self.update_config(session, device_uuid, 'device', config_rules)
#            db_running_config = running_config_result[0][0]
#            config_uuid = db_running_config.config_uuid
#            running_config_rules = update_config(
#                self.database, device_uuid, 'device', request.device_config.config_rules)
#            db_running_config = running_config_rules[0][0]
#
#            new_obj = DeviceModel(**{
#                'device_uuid'               : device_uuid,
#                'device_type'               : request.device_type,
#                'device_operational_status' : grpc_to_enum__device_operational_status(request.device_operational_status),
#                'device_config_uuid'        : config_uuid,
#            })
#            result: Tuple[DeviceModel, bool] = self.database.create_or_update(new_obj)
#            db_device, updated = result
#
#            self.set_drivers(db_device, request.device_drivers)
#
#
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
        device_uuid = request.device_uuid.uuid
        def callback(session : Session) -> bool:
            session.query(TopologyDeviceModel).filter_by(device_uuid=device_uuid).delete()
            num_deleted = session.query(DeviceModel).filter_by(device_uuid=device_uuid).delete()
            #db_device = session.query(DeviceModel).filter_by(device_uuid=device_uuid).one_or_none()
            #session.query(ConfigRuleModel).filter_by(config_uuid=db_device.device_config_uuid).delete()
            #session.query(ConfigModel).filter_by(config_uuid=db_device.device_config_uuid).delete()
            #session.query(DeviceModel).filter_by(device_uuid=device_uuid).delete()
            return num_deleted > 0
        deleted = run_transaction(sessionmaker(bind=self.db_engine), callback)
        #if deleted:
        #    notify_event(self.messagebroker, TOPIC_DEVICE, EventTypeEnum.EVENTTYPE_REMOVE, {'device_id': request})
        return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetDeviceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]:
        pass
        #for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT):
        #    yield DeviceEvent(**json.loads(message.content))


Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    # ----- Link -------------------------------------------------------------------------------------------------------
#
#    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    def ListLinkIds(self, request : Empty, context : grpc.ServicerContext) -> LinkIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#        with self.session() as session:
#            result = session.query(LinkModel).all()
#            return LinkIdList(link_ids=[db_link.dump_id() for db_link in result])
#
#
#    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    def ListLinks(self, request : Empty, context : grpc.ServicerContext) -> LinkList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#        with self.session() as session:
#            link_list = LinkList()
#
#            db_links = session.query(LinkModel).all()
#
#            for db_link in db_links:
#                link_uuid = db_link.link_uuid
#                filt = {'link_uuid': link_uuid}
#                link_endpoints = session.query(LinkEndPointModel).filter_by(**filt).all()
#                if link_endpoints:
#                    eps = []
#                    for lep in link_endpoints:
#                        filt = {'endpoint_uuid': lep.endpoint_uuid}
#                        eps.append(session.query(EndPointModel).filter_by(**filt).one())
#                    link_list.links.append(Link(**db_link.dump(eps)))
#
#            return link_list
#
#    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    def GetLink(self, request : LinkId, context : grpc.ServicerContext) -> Link:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#        link_uuid = request.link_uuid.uuid
#        with self.session() as session:
#            result = session.query(LinkModel).filter(LinkModel.link_uuid == link_uuid).one_or_none()
#            if not result:
#                raise NotFoundException(LinkModel.__name__.replace('Model', ''), link_uuid)
#
#            filt = {'link_uuid': link_uuid}
#            link_endpoints = session.query(LinkEndPointModel).filter_by(**filt).all()
#            if link_endpoints:
#                eps = []
#                for lep in link_endpoints:
#                    filt = {'endpoint_uuid': lep.endpoint_uuid}
#                    eps.append(session.query(EndPointModel).filter_by(**filt).one())
#                return Link(**result.dump(eps))
#
#            rd = result.dump()
#            rt = Link(**rd)
#
#            return rt
#
#
#
#    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    def SetLink(self, request : Link, context : grpc.ServicerContext) -> LinkId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#        link_uuid = request.link_id.link_uuid.uuid
#
#        new_link = LinkModel(**{
#            'link_uuid': link_uuid
#        })
#        result: Tuple[LinkModel, bool] = self.database.create_or_update(new_link)
#        db_link, updated = result
#
#        for endpoint_id in request.link_endpoint_ids:
#            endpoint_uuid                  = endpoint_id.endpoint_uuid.uuid
#            endpoint_device_uuid           = endpoint_id.device_id.device_uuid.uuid
#            endpoint_topology_uuid         = endpoint_id.topology_id.topology_uuid.uuid
#            endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
#
#
#            db_topology = None
#            if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
#                db_topology: TopologyModel = self.database.get_object(TopologyModel, endpoint_topology_uuid)
#                # check device is in topology
#                self.database.get_object(TopologyDeviceModel, endpoint_device_uuid)
#
#
#            link_endpoint = LinkEndPointModel(link_uuid=link_uuid, endpoint_uuid=endpoint_uuid)
#            result: Tuple[LinkEndPointModel, bool] = self.database.create_or_update(link_endpoint)
#
#            if db_topology is not None:
#                topology_link = TopologyLinkModel(topology_uuid=endpoint_topology_uuid, link_uuid=link_uuid)
#                result: Tuple[TopologyLinkModel, bool] = self.database.create_or_update(topology_link)
#
#        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
#        dict_link_id = db_link.dump_id()
#        notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': dict_link_id})
#        return LinkId(**dict_link_id)
#
#    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    def RemoveLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#        with self.session() as session:
#            link_uuid = request.link_uuid.uuid
#
#            session.query(TopologyLinkModel).filter_by(link_uuid=link_uuid).delete()
#            session.query(LinkEndPointModel).filter_by(link_uuid=link_uuid).delete()
#
#            result = session.query(LinkModel).filter_by(link_uuid=link_uuid).one_or_none()
#            if not result:
#                return Empty()
#            dict_link_id = result.dump_id()
#
#            session.query(LinkModel).filter_by(link_uuid=link_uuid).delete()
#            session.commit()
#            event_type = EventTypeEnum.EVENTTYPE_REMOVE
#            notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': dict_link_id})
#            return Empty()
#
##    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
##    def GetLinkEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
##        for message in self.messagebroker.consume({TOPIC_LINK}, consume_timeout=CONSUME_TIMEOUT):
##            yield LinkEvent(**json.loads(message.content))
#
#
#    # ----- Service ----------------------------------------------------------------------------------------------------
#
#    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    def ListServiceIds(self, request : ContextId, context : grpc.ServicerContext) -> ServiceIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#        context_uuid = request.context_uuid.uuid
#
#        with self.session() as session:
#            db_services = session.query(ServiceModel).filter_by(context_uuid=context_uuid).all()
#            return ServiceIdList(service_ids=[db_service.dump_id() for db_service in db_services])
#
#    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    def ListServices(self, request : ContextId, context : grpc.ServicerContext) -> ServiceList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#        context_uuid = request.context_uuid.uuid
#
#        with self.session() as session:
#            db_services = session.query(ServiceModel).filter_by(context_uuid=context_uuid).all()
#            return ServiceList(services=[db_service.dump() for db_service in db_services])
#
#
#
#    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    def GetService(self, request : ServiceId, context : grpc.ServicerContext) -> Service:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#        service_uuid = request.service_uuid.uuid
#        with self.session() as session:
#            result = session.query(ServiceModel).filter_by(service_uuid=service_uuid).one_or_none()
#
#        if not result:
#            raise NotFoundException(ServiceModel.__name__.replace('Model', ''), service_uuid)
#
#        return Service(**result.dump())
#
#    def set_constraint(self, db_constraints: ConstraintsModel, grpc_constraint: Constraint, position: int
#    ) -> Tuple[Union_ConstraintModel, bool]:
#        with self.session() as session:
#
#            grpc_constraint_kind = str(grpc_constraint.WhichOneof('constraint'))
#
#            parser = CONSTRAINT_PARSERS.get(grpc_constraint_kind)
#            if parser is None:
#                raise NotImplementedError('Constraint of kind {:s} is not implemented: {:s}'.format(
#                    grpc_constraint_kind, grpc_message_to_json_string(grpc_constraint)))
#
#            # create specific constraint
#            constraint_class, str_constraint_id, constraint_data, constraint_kind = parser(grpc_constraint)
#            str_constraint_id = str(uuid.uuid4())
#            LOGGER.info('str_constraint_id: {}'.format(str_constraint_id))
#            # str_constraint_key_hash = fast_hasher(':'.join([constraint_kind.value, str_constraint_id]))
#            # str_constraint_key = key_to_str([db_constraints.pk, str_constraint_key_hash], separator=':')
#
#            # result : Tuple[Union_ConstraintModel, bool] = update_or_create_object(
#            #     database, constraint_class, str_constraint_key, constraint_data)
#            constraint_data[constraint_class.main_pk_name()] = str_constraint_id
#            db_new_constraint = constraint_class(**constraint_data)
#            result: Tuple[Union_ConstraintModel, bool] = self.database.create_or_update(db_new_constraint)
#            db_specific_constraint, updated = result
#
#            # create generic constraint
#            # constraint_fk_field_name = 'constraint_uuid'.format(constraint_kind.value)
#            constraint_data = {
#                'constraints_uuid': db_constraints.constraints_uuid, 'position': position, 'kind': constraint_kind
#            }
#
#            db_new_constraint = ConstraintModel(**constraint_data)
#            result: Tuple[Union_ConstraintModel, bool] = self.database.create_or_update(db_new_constraint)
#            db_constraint, updated = result
#
#            return db_constraint, updated
#
#    def set_constraints(self, service_uuid: str, constraints_name : str, grpc_constraints
#    ) -> List[Tuple[Union[ConstraintsModel, ConstraintModel], bool]]:
#        with self.session() as session:
#            # str_constraints_key = key_to_str([db_parent_pk, constraints_name], separator=':')
#            # result : Tuple[ConstraintsModel, bool] = get_or_create_object(database, ConstraintsModel, str_constraints_key)
#            result = session.query(ConstraintsModel).filter_by(constraints_uuid=service_uuid).one_or_none()
#            created = None
#            if result:
#                created = True
#            session.query(ConstraintsModel).filter_by(constraints_uuid=service_uuid).one_or_none()
#            db_constraints = ConstraintsModel(constraints_uuid=service_uuid)
#            session.add(db_constraints)
#
#            db_objects = [(db_constraints, created)]
#
#            for position,grpc_constraint in enumerate(grpc_constraints):
#                result : Tuple[ConstraintModel, bool] = self.set_constraint(
#                    db_constraints, grpc_constraint, position)
#                db_constraint, updated = result
#                db_objects.append((db_constraint, updated))
#
#            return db_objects
#
#    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    def SetService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#        with self.lock:
#            with self.session() as session:
#
#                context_uuid = request.service_id.context_id.context_uuid.uuid
#                # db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
#                db_context = session.query(ContextModel).filter_by(context_uuid=context_uuid).one_or_none()
#
#                for i,endpoint_id in enumerate(request.service_endpoint_ids):
#                    endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
#                    if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid:
#                        raise InvalidArgumentException(
#                            'request.service_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i),
#                            endpoint_topology_context_uuid,
#                            ['should be == {:s}({:s})'.format(
#                                'request.service_id.context_id.context_uuid.uuid', context_uuid)])
#
#                service_uuid = request.service_id.service_uuid.uuid
#                # str_service_key = key_to_str([context_uuid, service_uuid])
#
#                constraints_result = self.set_constraints(service_uuid, 'constraints', request.service_constraints)
#                db_constraints = constraints_result[0][0]
#
#                config_rules = grpc_config_rules_to_raw(request.service_config.config_rules)
#                running_config_result = update_config(self.database, str_service_key, 'running', config_rules)
#                db_running_config = running_config_result[0][0]
#
#                result : Tuple[ServiceModel, bool] = update_or_create_object(self.database, ServiceModel, str_service_key, {
#                    'context_fk'            : db_context,
#                    'service_uuid'          : service_uuid,
#                    'service_type'          : grpc_to_enum__service_type(request.service_type),
#                    'service_constraints_fk': db_constraints,
#                    'service_status'        : grpc_to_enum__service_status(request.service_status.service_status),
#                    'service_config_fk'     : db_running_config,
#                })
#                db_service, updated = result
#
#                for i,endpoint_id in enumerate(request.service_endpoint_ids):
#                    endpoint_uuid                  = endpoint_id.endpoint_uuid.uuid
#                    endpoint_device_uuid           = endpoint_id.device_id.device_uuid.uuid
#                    endpoint_topology_uuid         = endpoint_id.topology_id.topology_uuid.uuid
#                    endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
#
#                    str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid])
#                    if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
#                        str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
#                        str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
#
#                    db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key)
#
#                    str_service_endpoint_key = key_to_str([service_uuid, str_endpoint_key], separator='--')
#                    result : Tuple[ServiceEndPointModel, bool] = get_or_create_object(
#                        self.database, ServiceEndPointModel, str_service_endpoint_key, {
#                            'service_fk': db_service, 'endpoint_fk': db_endpoint})
#                    #db_service_endpoint, service_endpoint_created = result
#
#                event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
#                dict_service_id = db_service.dump_id()
#                notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id})
#                return ServiceId(**dict_service_id)
#            context_uuid = request.service_id.context_id.context_uuid.uuid
#            db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
#
#            for i,endpoint_id in enumerate(request.service_endpoint_ids):
#                endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
#                if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid:
#                    raise InvalidArgumentException(
#                        'request.service_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i),
#                        endpoint_topology_context_uuid,
#                        ['should be == {:s}({:s})'.format(
#                            'request.service_id.context_id.context_uuid.uuid', context_uuid)])
#
#            service_uuid = request.service_id.service_uuid.uuid
#            str_service_key = key_to_str([context_uuid, service_uuid])
#
#            constraints_result = set_constraints(
#                self.database, str_service_key, 'service', request.service_constraints)
#            db_constraints = constraints_result[0][0]
#
#            running_config_rules = update_config(
#                self.database, str_service_key, 'service', request.service_config.config_rules)
#            db_running_config = running_config_rules[0][0]
#
#            result : Tuple[ServiceModel, bool] = update_or_create_object(self.database, ServiceModel, str_service_key, {
#                'context_fk'            : db_context,
#                'service_uuid'          : service_uuid,
#                'service_type'          : grpc_to_enum__service_type(request.service_type),
#                'service_constraints_fk': db_constraints,
#                'service_status'        : grpc_to_enum__service_status(request.service_status.service_status),
#                'service_config_fk'     : db_running_config,
#            })
#            db_service, updated = result
#
#            for i,endpoint_id in enumerate(request.service_endpoint_ids):
#                endpoint_uuid                  = endpoint_id.endpoint_uuid.uuid
#                endpoint_device_uuid           = endpoint_id.device_id.device_uuid.uuid
#                endpoint_topology_uuid         = endpoint_id.topology_id.topology_uuid.uuid
#                endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
#
#                str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid])
#                if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
#                    str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
#                    str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
#
#                db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key)
#
#                str_service_endpoint_key = key_to_str([service_uuid, str_endpoint_key], separator='--')
#                result : Tuple[ServiceEndPointModel, bool] = get_or_create_object(
#                    self.database, ServiceEndPointModel, str_service_endpoint_key, {
#                        'service_fk': db_service, 'endpoint_fk': db_endpoint})
#                #db_service_endpoint, service_endpoint_created = result
#
#            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
#            dict_service_id = db_service.dump_id()
#            notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id})
#            return ServiceId(**dict_service_id)
#
#    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    def RemoveService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#        with self.lock:
#            context_uuid = request.context_id.context_uuid.uuid
#            service_uuid = request.service_uuid.uuid
#            db_service = ServiceModel(self.database, key_to_str([context_uuid, service_uuid]), auto_load=False)
#            found = db_service.load()
#            if not found: return Empty()
#
#            dict_service_id = db_service.dump_id()
#            db_service.delete()
#
#            event_type = EventTypeEnum.EVENTTYPE_REMOVE
#            notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id})
#            return Empty()
#
##    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
##    def GetServiceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
##        for message in self.messagebroker.consume({TOPIC_SERVICE}, consume_timeout=CONSUME_TIMEOUT):
##            yield ServiceEvent(**json.loads(message.content))
#
#
#    # ----- Slice ----------------------------------------------------------------------------------------------------
#
#    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    def ListSliceIds(self, request : ContextId, context : grpc.ServicerContext) -> SliceIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#        with self.lock:
#            db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid)
#            db_slices : Set[SliceModel] = get_related_objects(db_context, SliceModel)
#            db_slices = sorted(db_slices, key=operator.attrgetter('pk'))
#            return SliceIdList(slice_ids=[db_slice.dump_id() for db_slice in db_slices])
#
#    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    def ListSlices(self, request : ContextId, context : grpc.ServicerContext) -> SliceList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#        with self.lock:
#            db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid)
#            db_slices : Set[SliceModel] = get_related_objects(db_context, SliceModel)
#            db_slices = sorted(db_slices, key=operator.attrgetter('pk'))
#            return SliceList(slices=[db_slice.dump() for db_slice in db_slices])
#
#    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    def GetSlice(self, request : SliceId, context : grpc.ServicerContext) -> Slice:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#        with self.lock:
#            str_key = key_to_str([request.context_id.context_uuid.uuid, request.slice_uuid.uuid])
#            db_slice : SliceModel = get_object(self.database, SliceModel, str_key)
#            return Slice(**db_slice.dump(
#                include_endpoint_ids=True, include_constraints=True, include_config_rules=True,
#                include_service_ids=True, include_subslice_ids=True))
#
#    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    def SetSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#        with self.lock:
#            context_uuid = request.slice_id.context_id.context_uuid.uuid
#            db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
#
#            for i,endpoint_id in enumerate(request.slice_endpoint_ids):
#                endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
#                if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid:
#                    raise InvalidArgumentException(
#                        'request.slice_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i),
#                        endpoint_topology_context_uuid,
#                        ['should be == {:s}({:s})'.format(
#                            'request.slice_id.context_id.context_uuid.uuid', context_uuid)])
#
#            slice_uuid = request.slice_id.slice_uuid.uuid
#            str_slice_key = key_to_str([context_uuid, slice_uuid])
#
#            constraints_result = set_constraints(
#                self.database, str_slice_key, 'slice', request.slice_constraints)
#            db_constraints = constraints_result[0][0]
#
#            running_config_rules = update_config(
#                self.database, str_slice_key, 'slice', request.slice_config.config_rules)
#            db_running_config = running_config_rules[0][0]
#
#            result : Tuple[SliceModel, bool] = update_or_create_object(self.database, SliceModel, str_slice_key, {
#                'context_fk'          : db_context,
#                'slice_uuid'          : slice_uuid,
#                'slice_constraints_fk': db_constraints,
#                'slice_status'        : grpc_to_enum__slice_status(request.slice_status.slice_status),
#                'slice_config_fk'     : db_running_config,
#                'slice_owner_uuid'    : request.slice_owner.owner_uuid.uuid,
#                'slice_owner_string'  : request.slice_owner.owner_string,
#            })
#            db_slice, updated = result
#
#            for i,endpoint_id in enumerate(request.slice_endpoint_ids):
#                endpoint_uuid                  = endpoint_id.endpoint_uuid.uuid
#                endpoint_device_uuid           = endpoint_id.device_id.device_uuid.uuid
#                endpoint_topology_uuid         = endpoint_id.topology_id.topology_uuid.uuid
#                endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
#
#                str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid])
#                if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
#                    str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
#                    str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
#
#                db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key)
#
#                str_slice_endpoint_key = key_to_str([str_slice_key, str_endpoint_key], separator='--')
#                result : Tuple[SliceEndPointModel, bool] = get_or_create_object(
#                    self.database, SliceEndPointModel, str_slice_endpoint_key, {
#                        'slice_fk': db_slice, 'endpoint_fk': db_endpoint})
#                #db_slice_endpoint, slice_endpoint_created = result
#
#            for i,service_id in enumerate(request.slice_service_ids):
#                service_uuid         = service_id.service_uuid.uuid
#                service_context_uuid = service_id.context_id.context_uuid.uuid
#                str_service_key = key_to_str([service_context_uuid, service_uuid])
#                db_service : ServiceModel = get_object(self.database, ServiceModel, str_service_key)
#
#                str_slice_service_key = key_to_str([str_slice_key, str_service_key], separator='--')
#                result : Tuple[SliceServiceModel, bool] = get_or_create_object(
#                    self.database, SliceServiceModel, str_slice_service_key, {
#                        'slice_fk': db_slice, 'service_fk': db_service})
#                #db_slice_service, slice_service_created = result
#
#            for i,subslice_id in enumerate(request.slice_subslice_ids):
#                subslice_uuid         = subslice_id.slice_uuid.uuid
#                subslice_context_uuid = subslice_id.context_id.context_uuid.uuid
#                str_subslice_key = key_to_str([subslice_context_uuid, subslice_uuid])
#                db_subslice : SliceModel = get_object(self.database, SliceModel, str_subslice_key)
#
#                str_slice_subslice_key = key_to_str([str_slice_key, str_subslice_key], separator='--')
#                result : Tuple[SliceSubSliceModel, bool] = get_or_create_object(
#                    self.database, SliceSubSliceModel, str_slice_subslice_key, {
#                        'slice_fk': db_slice, 'sub_slice_fk': db_subslice})
#                #db_slice_subslice, slice_subslice_created = result
#
#            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
#            dict_slice_id = db_slice.dump_id()
#            notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': dict_slice_id})
#            return SliceId(**dict_slice_id)
#
#    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    def UnsetSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#        with self.lock:
#            context_uuid = request.slice_id.context_id.context_uuid.uuid
#            db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
#
#            for i,endpoint_id in enumerate(request.slice_endpoint_ids):
#                endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
#                if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid:
#                    raise InvalidArgumentException(
#                        'request.slice_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i),
#                        endpoint_topology_context_uuid,
#                        ['should be == {:s}({:s})'.format(
#                            'request.slice_id.context_id.context_uuid.uuid', context_uuid)])
#
#            slice_uuid = request.slice_id.slice_uuid.uuid
#            str_slice_key = key_to_str([context_uuid, slice_uuid])
#
#            if len(request.slice_constraints) > 0:
#                raise NotImplementedError('UnsetSlice: removal of constraints')
#            if len(request.slice_config.config_rules) > 0:
#                raise NotImplementedError('UnsetSlice: removal of config rules')
#            if len(request.slice_endpoint_ids) > 0:
#                raise NotImplementedError('UnsetSlice: removal of endpoints')
#
#            updated = False
#
#            for service_id in request.slice_service_ids:
#                service_uuid         = service_id.service_uuid.uuid
#                service_context_uuid = service_id.context_id.context_uuid.uuid
#                str_service_key = key_to_str([service_context_uuid, service_uuid])
#                str_slice_service_key = key_to_str([str_slice_key, str_service_key], separator='--')
#                SliceServiceModel(self.database, str_slice_service_key).delete()
#                updated = True
#
#            for subslice_id in request.slice_subslice_ids:
#                subslice_uuid         = subslice_id.slice_uuid.uuid
#                subslice_context_uuid = subslice_id.context_id.context_uuid.uuid
#                str_subslice_key = key_to_str([subslice_context_uuid, subslice_uuid])
#                str_slice_subslice_key = key_to_str([str_slice_key, str_subslice_key], separator='--')
#                SliceSubSliceModel(self.database, str_slice_subslice_key).delete()
#                updated = True
#
#            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
#            db_slice : SliceModel = get_object(self.database, SliceModel, str_slice_key)
#            dict_slice_id = db_slice.dump_id()
#            notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': dict_slice_id})
#            return SliceId(**dict_slice_id)
#
#    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    def RemoveSlice(self, request : SliceId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#        with self.lock:
#            context_uuid = request.context_id.context_uuid.uuid
#            slice_uuid = request.slice_uuid.uuid
#            db_slice = SliceModel(self.database, key_to_str([context_uuid, slice_uuid]), auto_load=False)
#            found = db_slice.load()
#            if not found: return Empty()
#
#            dict_slice_id = db_slice.dump_id()
#            db_slice.delete()
#
#            event_type = EventTypeEnum.EVENTTYPE_REMOVE
#            notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': dict_slice_id})
#            return Empty()
#
##    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
##    def GetSliceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
##        for message in self.messagebroker.consume({TOPIC_SLICE}, consume_timeout=CONSUME_TIMEOUT):
##            yield SliceEvent(**json.loads(message.content))