Skip to content
Snippets Groups Projects
ContextServiceServicerImpl.py 18.6 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import grpc, json, logging, sqlalchemy
from typing import Iterator
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.message_broker.MessageBroker import MessageBroker
from common.proto.context_pb2 import (
    Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList,
    Context, ContextEvent, ContextId, ContextIdList, ContextList,
    Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    Empty,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    Link, LinkEvent, LinkId, LinkIdList, LinkList,
    Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList,
    Slice, SliceEvent, SliceId, SliceIdList, SliceList,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.context_pb2_grpc import ContextServiceServicer
from common.proto.context_policy_pb2_grpc import ContextPolicyServiceServicer
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .database.Connection import (
    connection_delete, connection_get, connection_list_ids, connection_list_objs, connection_set)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .database.Context import context_delete, context_get, context_list_ids, context_list_objs, context_set
from .database.Device import device_delete, device_get, device_list_ids, device_list_objs, device_set
from .database.Link import link_delete, link_get, link_list_ids, link_list_objs, link_set
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .database.PolicyRule import (
    policyrule_delete, policyrule_get, policyrule_list_ids, policyrule_list_objs, policyrule_set)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .database.Service import service_delete, service_get, service_list_ids, service_list_objs, service_set
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .database.Slice import slice_delete, slice_get, slice_list_ids, slice_list_objs, slice_set, slice_unset
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .database.Topology import topology_delete, topology_get, topology_list_ids, topology_list_objs, topology_set
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .Constants import (
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, #TOPIC_POLICY,
    TOPIC_SERVICE, TOPIC_SLICE, TOPIC_TOPOLOGY)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

LOGGER = logging.getLogger(__name__)

SERVICE_NAME = 'Context'
METHOD_NAMES = [
    'ListConnectionIds', 'ListConnections', 'GetConnection', 'SetConnection', 'RemoveConnection', 'GetConnectionEvents',
    'ListContextIds',    'ListContexts',    'GetContext',    'SetContext',    'RemoveContext',    'GetContextEvents',
    'ListTopologyIds',   'ListTopologies',  'GetTopology',   'SetTopology',   'RemoveTopology',   'GetTopologyEvents',
    'ListDeviceIds',     'ListDevices',     'GetDevice',     'SetDevice',     'RemoveDevice',     'GetDeviceEvents',
    'ListLinkIds',       'ListLinks',       'GetLink',       'SetLink',       'RemoveLink',       'GetLinkEvents',
    'ListServiceIds',    'ListServices',    'GetService',    'SetService',    'RemoveService',    'GetServiceEvents',
    'ListSliceIds',      'ListSlices',      'GetSlice',      'SetSlice',      'RemoveSlice',      'GetSliceEvents',
    'ListPolicyRuleIds', 'ListPolicyRules', 'GetPolicyRule', 'SetPolicyRule', 'RemovePolicyRule',
    'UnsetService',      'UnsetSlice',
]
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)

class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceServicer):
    def __init__(self, db_engine : sqlalchemy.engine.Engine, messagebroker : MessageBroker) -> None:
        LOGGER.debug('Creating Servicer...')
        self.db_engine = db_engine
        self.messagebroker = messagebroker
        LOGGER.debug('Servicer Created')

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def _get_metrics(self): return METRICS


Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Context ----------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListContextIds(self, request : Empty, context : grpc.ServicerContext) -> ContextIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return context_list_ids(self.db_engine)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListContexts(self, request : Empty, context : grpc.ServicerContext) -> ContextList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return context_list_objs(self.db_engine)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetContext(self, request : ContextId, context : grpc.ServicerContext) -> Context:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return context_get(self.db_engine, request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def SetContext(self, request : Context, context : grpc.ServicerContext) -> ContextId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_id,updated = context_set(self.db_engine, request) # pylint: disable=unused-variable
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': context_id})
        return context_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def RemoveContext(self, request : ContextId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        deleted = context_delete(self.db_engine, request) # pylint: disable=unused-variable
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #if deleted:
        #    notify_event(self.messagebroker, TOPIC_CONTEXT, EventTypeEnum.EVENTTYPE_REMOVE, {'context_id': request})
        return Empty()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetContextEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT):
            yield ContextEvent(**json.loads(message.content))

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Topology ---------------------------------------------------------------------------------------------------

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListTopologyIds(self, request : ContextId, context : grpc.ServicerContext) -> TopologyIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return topology_list_ids(self.db_engine, request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListTopologies(self, request : ContextId, context : grpc.ServicerContext) -> TopologyList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return topology_list_objs(self.db_engine, request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetTopology(self, request : TopologyId, context : grpc.ServicerContext) -> Topology:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return topology_get(self.db_engine, request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetTopology(self, request : Topology, context : grpc.ServicerContext) -> TopologyId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        topology_id,updated = topology_set(self.db_engine, request) # pylint: disable=unused-variable
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': topology_id})
        return topology_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveTopology(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        deleted = topology_delete(self.db_engine, request) # pylint: disable=unused-variable
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #if deleted:
        #    notify_event(self.messagebroker, TOPIC_TOPOLOGY, EventTypeEnum.EVENTTYPE_REMOVE, {'topology_id': request})
        return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetTopologyEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT):
            yield TopologyEvent(**json.loads(message.content))

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Device -----------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListDeviceIds(self, request : Empty, context : grpc.ServicerContext) -> DeviceIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return device_list_ids(self.db_engine)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListDevices(self, request : Empty, context : grpc.ServicerContext) -> DeviceList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return device_list_objs(self.db_engine)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetDevice(self, request : ContextId, context : grpc.ServicerContext) -> Device:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return device_get(self.db_engine, request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        device_id,updated = device_set(self.db_engine, request) # pylint: disable=unused-variable
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': device_id})
        return device_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        deleted = device_delete(self.db_engine, request) # pylint: disable=unused-variable
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #if deleted:
        #    notify_event(self.messagebroker, TOPIC_DEVICE, EventTypeEnum.EVENTTYPE_REMOVE, {'device_id': request})
        return Empty()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetDeviceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]:
        for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT):
            yield DeviceEvent(**json.loads(message.content))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Link -------------------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListLinkIds(self, request : Empty, context : grpc.ServicerContext) -> LinkIdList:
        return link_list_ids(self.db_engine)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListLinks(self, request : Empty, context : grpc.ServicerContext) -> LinkList:
        return link_list_objs(self.db_engine)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetLink(self, request : LinkId, context : grpc.ServicerContext) -> Link:
        return link_get(self.db_engine, request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetLink(self, request : Link, context : grpc.ServicerContext) -> LinkId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        link_id,updated = link_set(self.db_engine, request) # pylint: disable=unused-variable
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        #notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': link_id})
        return link_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        deleted = link_delete(self.db_engine, request) # pylint: disable=unused-variable
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #if deleted:
        #    notify_event(self.messagebroker, TOPIC_LINK, EventTypeEnum.EVENTTYPE_REMOVE, {'link_id': request})
        return Empty()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @safe_and_metered_rpc_method(METRICS, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetLinkEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]:
        for message in self.messagebroker.consume({TOPIC_LINK}, consume_timeout=CONSUME_TIMEOUT):
            yield LinkEvent(**json.loads(message.content))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Service ----------------------------------------------------------------------------------------------------

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListServiceIds(self, request : ContextId, context : grpc.ServicerContext) -> ServiceIdList:
        return service_list_ids(self.db_engine, request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListServices(self, request : ContextId, context : grpc.ServicerContext) -> ServiceList:
        return service_list_objs(self.db_engine, request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetService(self, request : ServiceId, context : grpc.ServicerContext) -> Service:
        return service_get(self.db_engine, request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        service_id,updated = service_set(self.db_engine, request) # pylint: disable=unused-variable
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        #notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': service_id})
        return service_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        deleted = service_delete(self.db_engine, request) # pylint: disable=unused-variable
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #if deleted:
        #    notify_event(self.messagebroker, TOPIC_SERVICE, EventTypeEnum.EVENTTYPE_REMOVE, {'service_id': request})
        return Empty()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetServiceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]:
        for message in self.messagebroker.consume({TOPIC_SERVICE}, consume_timeout=CONSUME_TIMEOUT):
            yield ServiceEvent(**json.loads(message.content))


Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Slice ----------------------------------------------------------------------------------------------------

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListSliceIds(self, request : ContextId, context : grpc.ServicerContext) -> SliceIdList:
        return slice_list_ids(self.db_engine, request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListSlices(self, request : ContextId, context : grpc.ServicerContext) -> SliceList:
        return slice_list_objs(self.db_engine, request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetSlice(self, request : SliceId, context : grpc.ServicerContext) -> Slice:
        return slice_get(self.db_engine, request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        slice_id,updated = slice_set(self.db_engine, request) # pylint: disable=unused-variable
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        #notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': slice_id})
        return slice_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def UnsetSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        slice_id,updated = slice_unset(self.db_engine, request) # pylint: disable=unused-variable
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #if updated:
        #    notify_event(self.messagebroker, TOPIC_SLICE, EventTypeEnum.EVENTTYPE_UPDATE, {'slice_id': slice_id})
        return slice_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveSlice(self, request : SliceId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        deleted = slice_delete(self.db_engine, request) # pylint: disable=unused-variable
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #if deleted:
        #    notify_event(self.messagebroker, TOPIC_SLICE, EventTypeEnum.EVENTTYPE_REMOVE, {'slice_id': request})
        return Empty()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetSliceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]:
        for message in self.messagebroker.consume({TOPIC_SLICE}, consume_timeout=CONSUME_TIMEOUT):
            yield SliceEvent(**json.loads(message.content))


Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Connection -------------------------------------------------------------------------------------------------

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListConnectionIds(self, request : ServiceId, context : grpc.ServicerContext) -> ConnectionIdList:
        return connection_list_ids(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListConnections(self, request : ContextId, context : grpc.ServicerContext) -> ConnectionList:
        return connection_list_objs(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetConnection(self, request : ConnectionId, context : grpc.ServicerContext) -> Connection:
        return connection_get(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetConnection(self, request : Connection, context : grpc.ServicerContext) -> ConnectionId:
        connection_id,updated = connection_set(self.db_engine, request) # pylint: disable=unused-variable
        #event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        #notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': connection_id})
        return connection_id

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemoveConnection(self, request : ConnectionId, context : grpc.ServicerContext) -> Empty:
        deleted = connection_delete(self.db_engine, request) # pylint: disable=unused-variable
        #if deleted:
        #    event_type = EventTypeEnum.EVENTTYPE_REMOVE
        #    notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': request})
        return Empty()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetConnectionEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]:
        for message in self.messagebroker.consume({TOPIC_CONNECTION}, consume_timeout=CONSUME_TIMEOUT):
            yield ConnectionEvent(**json.loads(message.content))


Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Policy -----------------------------------------------------------------------------------------------------

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListPolicyRuleIds(self, request : Empty, context: grpc.ServicerContext) -> PolicyRuleIdList:
        return policyrule_list_ids(self.db_engine)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ListPolicyRules(self, request : Empty, context: grpc.ServicerContext) -> PolicyRuleList:
        return policyrule_list_objs(self.db_engine)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetPolicyRule(self, request : PolicyRuleId, context: grpc.ServicerContext) -> PolicyRule:
        return policyrule_get(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def SetPolicyRule(self, request : PolicyRule, context: grpc.ServicerContext) -> PolicyRuleId:
        policyrule_id,updated = policyrule_set(self.db_engine, request) # pylint: disable=unused-variable
        return policyrule_id

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def RemovePolicyRule(self, request : PolicyRuleId, context: grpc.ServicerContext) -> Empty:
        deleted = policyrule_delete(self.db_engine, request) # pylint: disable=unused-variable
        return Empty()