Skip to content
Snippets Groups Projects
ContextServiceServicerImpl.py 17.9 KiB
Newer Older
  • Learn to ignore specific revisions
  • Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    import grpc, json, logging, sqlalchemy
    from typing import Iterator
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.message_broker.MessageBroker import MessageBroker
    from common.proto.context_pb2 import (
        Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList,
        Context, ContextEvent, ContextId, ContextIdList, ContextList,
        Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList,
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        Empty,
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        Link, LinkEvent, LinkId, LinkIdList, LinkList,
        Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList,
        Slice, SliceEvent, SliceId, SliceIdList, SliceList,
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.proto.context_pb2_grpc import ContextServiceServicer
    from common.proto.context_policy_pb2_grpc import ContextPolicyServiceServicer
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from .database.Connection import (
        connection_delete, connection_get, connection_list_ids, connection_list_objs, connection_set)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from .database.Context import context_delete, context_get, context_list_ids, context_list_objs, context_set
    from .database.Device import device_delete, device_get, device_list_ids, device_list_objs, device_set
    from .database.Link import link_delete, link_get, link_list_ids, link_list_objs, link_set
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from .database.PolicyRule import (
        policyrule_delete, policyrule_get, policyrule_list_ids, policyrule_list_objs, policyrule_set)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from .database.Service import service_delete, service_get, service_list_ids, service_list_objs, service_set
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from .database.Slice import slice_delete, slice_get, slice_list_ids, slice_list_objs, slice_set, slice_unset
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from .database.Topology import topology_delete, topology_get, topology_list_ids, topology_list_objs, topology_set
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from .Constants import (
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, #TOPIC_POLICY,
        TOPIC_SERVICE, TOPIC_SLICE, TOPIC_TOPOLOGY)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
    LOGGER = logging.getLogger(__name__)
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    METRICS_POOL = MetricsPool('Context', 'RPC')
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
    class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceServicer):
        def __init__(self, db_engine : sqlalchemy.engine.Engine, messagebroker : MessageBroker) -> None:
            LOGGER.debug('Creating Servicer...')
            self.db_engine = db_engine
            self.messagebroker = messagebroker
            LOGGER.debug('Servicer Created')
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def _get_metrics(self) -> MetricsPool: return METRICS_POOL
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        # ----- Context ----------------------------------------------------------------------------------------------------
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def ListContextIds(self, request : Empty, context : grpc.ServicerContext) -> ContextIdList:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            return context_list_ids(self.db_engine)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def ListContexts(self, request : Empty, context : grpc.ServicerContext) -> ContextList:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            return context_list_objs(self.db_engine)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def GetContext(self, request : ContextId, context : grpc.ServicerContext) -> Context:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            return context_get(self.db_engine, request)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def SetContext(self, request : Context, context : grpc.ServicerContext) -> ContextId:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            context_id,updated = context_set(self.db_engine, request) # pylint: disable=unused-variable
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': context_id})
            return context_id
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def RemoveContext(self, request : ContextId, context : grpc.ServicerContext) -> Empty:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            deleted = context_delete(self.db_engine, request) # pylint: disable=unused-variable
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #if deleted:
            #    notify_event(self.messagebroker, TOPIC_CONTEXT, EventTypeEnum.EVENTTYPE_REMOVE, {'context_id': request})
            return Empty()
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def GetContextEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT):
                yield ContextEvent(**json.loads(message.content))
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
        # ----- Topology ---------------------------------------------------------------------------------------------------
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def ListTopologyIds(self, request : ContextId, context : grpc.ServicerContext) -> TopologyIdList:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            return topology_list_ids(self.db_engine, request)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def ListTopologies(self, request : ContextId, context : grpc.ServicerContext) -> TopologyList:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            return topology_list_objs(self.db_engine, request)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def GetTopology(self, request : TopologyId, context : grpc.ServicerContext) -> Topology:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            return topology_get(self.db_engine, request)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def SetTopology(self, request : Topology, context : grpc.ServicerContext) -> TopologyId:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            topology_id,updated = topology_set(self.db_engine, request) # pylint: disable=unused-variable
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': topology_id})
            return topology_id
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def RemoveTopology(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            deleted = topology_delete(self.db_engine, request) # pylint: disable=unused-variable
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #if deleted:
            #    notify_event(self.messagebroker, TOPIC_TOPOLOGY, EventTypeEnum.EVENTTYPE_REMOVE, {'topology_id': request})
            return Empty()
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def GetTopologyEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT):
                yield TopologyEvent(**json.loads(message.content))
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
        # ----- Device -----------------------------------------------------------------------------------------------------
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def ListDeviceIds(self, request : Empty, context : grpc.ServicerContext) -> DeviceIdList:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            return device_list_ids(self.db_engine)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def ListDevices(self, request : Empty, context : grpc.ServicerContext) -> DeviceList:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            return device_list_objs(self.db_engine)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def GetDevice(self, request : ContextId, context : grpc.ServicerContext) -> Device:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            return device_get(self.db_engine, request)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def SetDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            device_id,updated = device_set(self.db_engine, request) # pylint: disable=unused-variable
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': device_id})
            return device_id
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def RemoveDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            deleted = device_delete(self.db_engine, request) # pylint: disable=unused-variable
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #if deleted:
            #    notify_event(self.messagebroker, TOPIC_DEVICE, EventTypeEnum.EVENTTYPE_REMOVE, {'device_id': request})
            return Empty()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def GetDeviceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]:
            for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT):
                yield DeviceEvent(**json.loads(message.content))
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        # ----- Link -------------------------------------------------------------------------------------------------------
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def ListLinkIds(self, request : Empty, context : grpc.ServicerContext) -> LinkIdList:
            return link_list_ids(self.db_engine)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def ListLinks(self, request : Empty, context : grpc.ServicerContext) -> LinkList:
            return link_list_objs(self.db_engine)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def GetLink(self, request : LinkId, context : grpc.ServicerContext) -> Link:
            return link_get(self.db_engine, request)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def SetLink(self, request : Link, context : grpc.ServicerContext) -> LinkId:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            link_id,updated = link_set(self.db_engine, request) # pylint: disable=unused-variable
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            #notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': link_id})
            return link_id
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def RemoveLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            deleted = link_delete(self.db_engine, request) # pylint: disable=unused-variable
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #if deleted:
            #    notify_event(self.messagebroker, TOPIC_LINK, EventTypeEnum.EVENTTYPE_REMOVE, {'link_id': request})
            return Empty()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def GetLinkEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]:
            for message in self.messagebroker.consume({TOPIC_LINK}, consume_timeout=CONSUME_TIMEOUT):
                yield LinkEvent(**json.loads(message.content))
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        # ----- Service ----------------------------------------------------------------------------------------------------
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def ListServiceIds(self, request : ContextId, context : grpc.ServicerContext) -> ServiceIdList:
            return service_list_ids(self.db_engine, request)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def ListServices(self, request : ContextId, context : grpc.ServicerContext) -> ServiceList:
            return service_list_objs(self.db_engine, request)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def GetService(self, request : ServiceId, context : grpc.ServicerContext) -> Service:
            return service_get(self.db_engine, request)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def SetService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            service_id,updated = service_set(self.db_engine, request) # pylint: disable=unused-variable
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            #notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': service_id})
            return service_id
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def RemoveService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            deleted = service_delete(self.db_engine, request) # pylint: disable=unused-variable
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #if deleted:
            #    notify_event(self.messagebroker, TOPIC_SERVICE, EventTypeEnum.EVENTTYPE_REMOVE, {'service_id': request})
            return Empty()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def GetServiceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]:
            for message in self.messagebroker.consume({TOPIC_SERVICE}, consume_timeout=CONSUME_TIMEOUT):
                yield ServiceEvent(**json.loads(message.content))
    
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        # ----- Slice ----------------------------------------------------------------------------------------------------
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def ListSliceIds(self, request : ContextId, context : grpc.ServicerContext) -> SliceIdList:
            return slice_list_ids(self.db_engine, request)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def ListSlices(self, request : ContextId, context : grpc.ServicerContext) -> SliceList:
            return slice_list_objs(self.db_engine, request)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def GetSlice(self, request : SliceId, context : grpc.ServicerContext) -> Slice:
            return slice_get(self.db_engine, request)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def SetSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            slice_id,updated = slice_set(self.db_engine, request) # pylint: disable=unused-variable
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            #notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': slice_id})
            return slice_id
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def UnsetSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            slice_id,updated = slice_unset(self.db_engine, request) # pylint: disable=unused-variable
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #if updated:
            #    notify_event(self.messagebroker, TOPIC_SLICE, EventTypeEnum.EVENTTYPE_UPDATE, {'slice_id': slice_id})
            return slice_id
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def RemoveSlice(self, request : SliceId, context : grpc.ServicerContext) -> Empty:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            deleted = slice_delete(self.db_engine, request) # pylint: disable=unused-variable
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            #if deleted:
            #    notify_event(self.messagebroker, TOPIC_SLICE, EventTypeEnum.EVENTTYPE_REMOVE, {'slice_id': request})
            return Empty()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def GetSliceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]:
            for message in self.messagebroker.consume({TOPIC_SLICE}, consume_timeout=CONSUME_TIMEOUT):
                yield SliceEvent(**json.loads(message.content))
    
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        # ----- Connection -------------------------------------------------------------------------------------------------
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def ListConnectionIds(self, request : ServiceId, context : grpc.ServicerContext) -> ConnectionIdList:
            return connection_list_ids(self.db_engine, request)
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def ListConnections(self, request : ContextId, context : grpc.ServicerContext) -> ConnectionList:
            return connection_list_objs(self.db_engine, request)
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def GetConnection(self, request : ConnectionId, context : grpc.ServicerContext) -> Connection:
            return connection_get(self.db_engine, request)
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def SetConnection(self, request : Connection, context : grpc.ServicerContext) -> ConnectionId:
            connection_id,updated = connection_set(self.db_engine, request) # pylint: disable=unused-variable
            #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            #notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': connection_id})
            return connection_id
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def RemoveConnection(self, request : ConnectionId, context : grpc.ServicerContext) -> Empty:
            deleted = connection_delete(self.db_engine, request) # pylint: disable=unused-variable
            #if deleted:
            #    event_type = EventTypeEnum.EVENTTYPE_REMOVE
            #    notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': request})
            return Empty()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def GetConnectionEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]:
            for message in self.messagebroker.consume({TOPIC_CONNECTION}, consume_timeout=CONSUME_TIMEOUT):
                yield ConnectionEvent(**json.loads(message.content))
    
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        # ----- Policy -----------------------------------------------------------------------------------------------------
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def ListPolicyRuleIds(self, request : Empty, context: grpc.ServicerContext) -> PolicyRuleIdList:
            return policyrule_list_ids(self.db_engine)
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def ListPolicyRules(self, request : Empty, context: grpc.ServicerContext) -> PolicyRuleList:
            return policyrule_list_objs(self.db_engine)
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def GetPolicyRule(self, request : PolicyRuleId, context: grpc.ServicerContext) -> PolicyRule:
            return policyrule_get(self.db_engine, request)
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def SetPolicyRule(self, request : PolicyRule, context: grpc.ServicerContext) -> PolicyRuleId:
            policyrule_id,updated = policyrule_set(self.db_engine, request) # pylint: disable=unused-variable
            return policyrule_id
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def RemovePolicyRule(self, request : PolicyRuleId, context: grpc.ServicerContext) -> Empty:
            deleted = policyrule_delete(self.db_engine, request) # pylint: disable=unused-variable
            return Empty()