Skip to content
Snippets Groups Projects
ContextServiceServicerImpl.py 18.6 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import grpc, json, logging, sqlalchemy
from typing import Iterator
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.message_broker.MessageBroker import MessageBroker
from common.proto.context_pb2 import (
    Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList,
    Context, ContextEvent, ContextId, ContextIdList, ContextList,
    Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    Empty, EventTypeEnum,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    Link, LinkEvent, LinkId, LinkIdList, LinkList,
    Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList,
    Slice, SliceEvent, SliceId, SliceIdList, SliceList,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.context_pb2_grpc import ContextServiceServicer
from common.proto.context_policy_pb2_grpc import ContextPolicyServiceServicer
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .database.Connection import (
    connection_delete, connection_get, connection_list_ids, connection_list_objs, connection_set)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .database.Context import context_delete, context_get, context_list_ids, context_list_objs, context_set
from .database.Device import device_delete, device_get, device_list_ids, device_list_objs, device_set
from .database.Link import link_delete, link_get, link_list_ids, link_list_objs, link_set
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .database.PolicyRule import (
    policyrule_delete, policyrule_get, policyrule_list_ids, policyrule_list_objs, policyrule_set)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .database.Service import service_delete, service_get, service_list_ids, service_list_objs, service_set
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .database.Slice import slice_delete, slice_get, slice_list_ids, slice_list_objs, slice_set, slice_unset
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .database.Topology import topology_delete, topology_get, topology_list_ids, topology_list_objs, topology_set
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .Events import (
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_POLICY, TOPIC_SERVICE,
    TOPIC_SLICE, TOPIC_TOPOLOGY, notify_event)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

LOGGER = logging.getLogger(__name__)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
METRICS_POOL = MetricsPool('Context', 'RPC')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceServicer):
    def __init__(self, db_engine : sqlalchemy.engine.Engine, messagebroker : MessageBroker) -> None:
        LOGGER.debug('Creating Servicer...')
        self.db_engine = db_engine
        self.messagebroker = messagebroker
        LOGGER.debug('Servicer Created')

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def _get_metrics(self) -> MetricsPool: return METRICS_POOL
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Context ----------------------------------------------------------------------------------------------------

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListContextIds(self, request : Empty, context : grpc.ServicerContext) -> ContextIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return ContextIdList(context_ids=context_list_ids(self.db_engine))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListContexts(self, request : Empty, context : grpc.ServicerContext) -> ContextList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return ContextList(contexts=context_list_objs(self.db_engine))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetContext(self, request : ContextId, context : grpc.ServicerContext) -> Context:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Context(**context_get(self.db_engine, request))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def SetContext(self, request : Context, context : grpc.ServicerContext) -> ContextId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_id,updated = context_set(self.db_engine, request)
        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': context_id})
        return ContextId(**context_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def RemoveContext(self, request : ContextId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_id,deleted = context_delete(self.db_engine, request)
        if deleted:
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': context_id})
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Empty()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetContextEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT):
            yield ContextEvent(**json.loads(message.content))

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Topology ---------------------------------------------------------------------------------------------------

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListTopologyIds(self, request : ContextId, context : grpc.ServicerContext) -> TopologyIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return TopologyIdList(topology_ids=topology_list_ids(self.db_engine, request))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListTopologies(self, request : ContextId, context : grpc.ServicerContext) -> TopologyList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return TopologyList(topologies=topology_list_objs(self.db_engine, request))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetTopology(self, request : TopologyId, context : grpc.ServicerContext) -> Topology:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Topology(**topology_get(self.db_engine, request))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def SetTopology(self, request : Topology, context : grpc.ServicerContext) -> TopologyId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        topology_id,updated = topology_set(self.db_engine, request)
        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': topology_id})
        return TopologyId(**topology_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def RemoveTopology(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        topology_id,deleted = topology_delete(self.db_engine, request)
        if deleted:
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': topology_id})
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Empty()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetTopologyEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT):
            yield TopologyEvent(**json.loads(message.content))

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Device -----------------------------------------------------------------------------------------------------

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListDeviceIds(self, request : Empty, context : grpc.ServicerContext) -> DeviceIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return DeviceIdList(device_ids=device_list_ids(self.db_engine))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListDevices(self, request : Empty, context : grpc.ServicerContext) -> DeviceList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return DeviceList(devices=device_list_objs(self.db_engine))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetDevice(self, request : ContextId, context : grpc.ServicerContext) -> Device:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Device(**device_get(self.db_engine, request))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def SetDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        device_id,updated = device_set(self.db_engine, request)
        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': device_id})
        return DeviceId(**device_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def RemoveDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        device_id,deleted = device_delete(self.db_engine, request)
        if deleted:
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': device_id})
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Empty()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetDeviceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]:
        for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT):
            yield DeviceEvent(**json.loads(message.content))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Link -------------------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListLinkIds(self, request : Empty, context : grpc.ServicerContext) -> LinkIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return LinkIdList(link_ids=link_list_ids(self.db_engine))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListLinks(self, request : Empty, context : grpc.ServicerContext) -> LinkList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return LinkList(links=link_list_objs(self.db_engine))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetLink(self, request : LinkId, context : grpc.ServicerContext) -> Link:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Link(**link_get(self.db_engine, request))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def SetLink(self, request : Link, context : grpc.ServicerContext) -> LinkId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        link_id,updated = link_set(self.db_engine, request)
        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': link_id})
        return LinkId(**link_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def RemoveLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        link_id,deleted = link_delete(self.db_engine, request)
        if deleted:
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': link_id})
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Empty()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetLinkEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]:
        for message in self.messagebroker.consume({TOPIC_LINK}, consume_timeout=CONSUME_TIMEOUT):
            yield LinkEvent(**json.loads(message.content))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Service ----------------------------------------------------------------------------------------------------

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListServiceIds(self, request : ContextId, context : grpc.ServicerContext) -> ServiceIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return ServiceIdList(service_ids=service_list_ids(self.db_engine, request))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListServices(self, request : ContextId, context : grpc.ServicerContext) -> ServiceList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return ServiceList(services=service_list_objs(self.db_engine, request))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetService(self, request : ServiceId, context : grpc.ServicerContext) -> Service:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Service(**service_get(self.db_engine, request))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def SetService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        service_id,updated = service_set(self.db_engine, request)
        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': service_id})
        return ServiceId(**service_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def RemoveService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        service_id,deleted = service_delete(self.db_engine, request)
        if deleted:
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': service_id})
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Empty()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetServiceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]:
        for message in self.messagebroker.consume({TOPIC_SERVICE}, consume_timeout=CONSUME_TIMEOUT):
            yield ServiceEvent(**json.loads(message.content))


Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Slice ----------------------------------------------------------------------------------------------------

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListSliceIds(self, request : ContextId, context : grpc.ServicerContext) -> SliceIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return SliceIdList(slice_ids=slice_list_ids(self.db_engine, request))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListSlices(self, request : ContextId, context : grpc.ServicerContext) -> SliceList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return SliceList(slices=slice_list_objs(self.db_engine, request))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetSlice(self, request : SliceId, context : grpc.ServicerContext) -> Slice:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Slice(**slice_get(self.db_engine, request))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def SetSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        slice_id,updated = slice_set(self.db_engine, request)
        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': slice_id})
        return SliceId(**slice_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def UnsetSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        slice_id,updated = slice_unset(self.db_engine, request)
        if updated:
            event_type = EventTypeEnum.EVENTTYPE_UPDATE
            notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': slice_id})
        return SliceId(**slice_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def RemoveSlice(self, request : SliceId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        slice_id,deleted = slice_delete(self.db_engine, request)
        if deleted:
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': slice_id})
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Empty()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetSliceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]:
        for message in self.messagebroker.consume({TOPIC_SLICE}, consume_timeout=CONSUME_TIMEOUT):
            yield SliceEvent(**json.loads(message.content))


Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Connection -------------------------------------------------------------------------------------------------

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListConnectionIds(self, request : ServiceId, context : grpc.ServicerContext) -> ConnectionIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return ConnectionIdList(connection_ids=connection_list_ids(self.db_engine, request))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListConnections(self, request : ContextId, context : grpc.ServicerContext) -> ConnectionList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return ConnectionList(connections=connection_list_objs(self.db_engine, request))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetConnection(self, request : ConnectionId, context : grpc.ServicerContext) -> Connection:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Connection(**connection_get(self.db_engine, request))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def SetConnection(self, request : Connection, context : grpc.ServicerContext) -> ConnectionId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        connection_id,updated = connection_set(self.db_engine, request)
        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': connection_id})
        return ConnectionId(**connection_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def RemoveConnection(self, request : ConnectionId, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        connection_id,deleted = connection_delete(self.db_engine, request)
        if deleted:
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': connection_id})
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Empty()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetConnectionEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]:
        for message in self.messagebroker.consume({TOPIC_CONNECTION}, consume_timeout=CONSUME_TIMEOUT):
            yield ConnectionEvent(**json.loads(message.content))


Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Policy -----------------------------------------------------------------------------------------------------

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListPolicyRuleIds(self, request : Empty, context: grpc.ServicerContext) -> PolicyRuleIdList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return PolicyRuleIdList(policyRuleIdList=policyrule_list_ids(self.db_engine))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ListPolicyRules(self, request : Empty, context: grpc.ServicerContext) -> PolicyRuleList:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return PolicyRuleList(policyRules=policyrule_list_objs(self.db_engine))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def GetPolicyRule(self, request : PolicyRuleId, context: grpc.ServicerContext) -> PolicyRule:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return PolicyRule(**policyrule_get(self.db_engine, request))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def SetPolicyRule(self, request : PolicyRule, context: grpc.ServicerContext) -> PolicyRuleId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        policyrule_id,updated = policyrule_set(self.db_engine, request)
        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        notify_event(self.messagebroker, TOPIC_POLICY, event_type, {'policyrule_id': policyrule_id})
        return PolicyRuleId(**policyrule_id)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def RemovePolicyRule(self, request : PolicyRuleId, context: grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        policyrule_id,deleted = policyrule_delete(self.db_engine, request)
        if deleted:
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_POLICY, event_type, {'policyrule_id': policyrule_id})
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return Empty()