Skip to content
Snippets Groups Projects
Select Git revision
  • 59af354425c1a29b809b5c352be9ee49865f59e8
  • master default
  • cnit_ofc26
  • feat/344-implement-a-new-firewall-agent-controllable-through-restconf-openconfig
  • feat/343-integration-of-mimir-deployment-in-production-environment
  • ofc_polimi
  • feat/305-cttc-enhanced-netconf-openconfig-sbi-driver-for-dscm-pluggables
  • feat/306-cttc-enhanced-restconf-based-openconfig-nbi-for-dscm-pluggables
  • feat/301-cttc-dscm-pluggables
  • CTTC-IMPLEMENT-NBI-CONNECTOR-NOS-ZTP
  • CTTC-TEST-SMARTNICS-6GMICROSDN-ZTP
  • develop protected
  • feat/327-tid-new-service-to-ipowdm-controller-to-manage-transceivers-configuration-on-external-agent
  • cnit_tapi
  • feat/330-tid-pcep-component
  • feat/tid-newer-pcep-component
  • feat/116-ubi-updates-in-telemetry-backend-to-support-p4-in-band-network-telemetry
  • feat/292-cttc-implement-integration-test-for-ryu-openflow
  • cnit-p2mp-premerge
  • feat/325-tid-nbi-e2e-to-manage-e2e-path-computation
  • feat/326-tid-external-management-of-devices-telemetry-nbi
  • feat/324-tid-nbi-ietf_l3vpn-deploy-fail
  • v5.0.0 protected
  • v4.0.0 protected
  • demo-dpiab-eucnc2024
  • v3.0.0 protected
  • v2.1.0 protected
  • v2.0.0 protected
  • v1.0.0 protected
29 results

ContextServiceServicerImpl.py

Blame
  • gifrerenom's avatar
    Lluis Gifre Renom authored
    - configured constant with event collection timeout for unitary tests and debug purposes
    - cosmetic changes
    - migrated event reporting for Connection entity
    59af3544
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    ContextServiceServicerImpl.py 18.23 KiB
    # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import grpc, json, logging, sqlalchemy
    from typing import Iterator
    from common.message_broker.MessageBroker import MessageBroker
    from common.proto.context_pb2 import (
        Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList,
        Context, ContextEvent, ContextId, ContextIdList, ContextList,
        Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList,
        Empty, EventTypeEnum,
        Link, LinkEvent, LinkId, LinkIdList, LinkList,
        Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList,
        Slice, SliceEvent, SliceId, SliceIdList, SliceList,
        Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList)
    from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule
    from common.proto.context_pb2_grpc import ContextServiceServicer
    from common.proto.context_policy_pb2_grpc import ContextPolicyServiceServicer
    from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
    from .database.Connection import (
        connection_delete, connection_get, connection_list_ids, connection_list_objs, connection_set)
    from .database.Context import context_delete, context_get, context_list_ids, context_list_objs, context_set
    from .database.Device import device_delete, device_get, device_list_ids, device_list_objs, device_set
    from .database.Link import link_delete, link_get, link_list_ids, link_list_objs, link_set
    from .database.PolicyRule import (
        policyrule_delete, policyrule_get, policyrule_list_ids, policyrule_list_objs, policyrule_set)
    from .database.Service import service_delete, service_get, service_list_ids, service_list_objs, service_set
    from .database.Slice import slice_delete, slice_get, slice_list_ids, slice_list_objs, slice_set, slice_unset
    from .database.Topology import topology_delete, topology_get, topology_list_ids, topology_list_objs, topology_set
    from .Events import (
        CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, #TOPIC_POLICY,
        TOPIC_SERVICE, TOPIC_SLICE, TOPIC_TOPOLOGY, notify_event)
    
    LOGGER = logging.getLogger(__name__)
    
    METRICS_POOL = MetricsPool('Context', 'RPC')
    
    class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceServicer):
        def __init__(self, db_engine : sqlalchemy.engine.Engine, messagebroker : MessageBroker) -> None:
            LOGGER.debug('Creating Servicer...')
            self.db_engine = db_engine
            self.messagebroker = messagebroker
            LOGGER.debug('Servicer Created')
    
        def _get_metrics(self) -> MetricsPool: return METRICS_POOL
    
    
        # ----- Context ----------------------------------------------------------------------------------------------------
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def ListContextIds(self, request : Empty, context : grpc.ServicerContext) -> ContextIdList:
            return ContextIdList(context_ids=context_list_ids(self.db_engine))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def ListContexts(self, request : Empty, context : grpc.ServicerContext) -> ContextList:
            return ContextList(contexts=context_list_objs(self.db_engine))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def GetContext(self, request : ContextId, context : grpc.ServicerContext) -> Context:
            return Context(**context_get(self.db_engine, request))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def SetContext(self, request : Context, context : grpc.ServicerContext) -> ContextId:
            context_id,updated = context_set(self.db_engine, request)
            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': context_id})
            return ContextId(**context_id)
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def RemoveContext(self, request : ContextId, context : grpc.ServicerContext) -> Empty:
            context_id,deleted = context_delete(self.db_engine, request)
            if deleted:
                event_type = EventTypeEnum.EVENTTYPE_REMOVE
                notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': context_id})
            return Empty()
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def GetContextEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
            for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT):
                yield ContextEvent(**json.loads(message.content))
    
    
        # ----- Topology ---------------------------------------------------------------------------------------------------
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def ListTopologyIds(self, request : ContextId, context : grpc.ServicerContext) -> TopologyIdList:
            return TopologyIdList(topology_ids=topology_list_ids(self.db_engine, request))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def ListTopologies(self, request : ContextId, context : grpc.ServicerContext) -> TopologyList:
            return TopologyList(topologies=topology_list_objs(self.db_engine, request))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def GetTopology(self, request : TopologyId, context : grpc.ServicerContext) -> Topology:
            return Topology(**topology_get(self.db_engine, request))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def SetTopology(self, request : Topology, context : grpc.ServicerContext) -> TopologyId:
            topology_id,updated = topology_set(self.db_engine, request)
            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': topology_id})
            return TopologyId(**topology_id)
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def RemoveTopology(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
            topology_id,deleted = topology_delete(self.db_engine, request)
            if deleted:
                event_type = EventTypeEnum.EVENTTYPE_REMOVE
                notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': topology_id})
            return Empty()
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def GetTopologyEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]:
            for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT):
                yield TopologyEvent(**json.loads(message.content))
    
    
        # ----- Device -----------------------------------------------------------------------------------------------------
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def ListDeviceIds(self, request : Empty, context : grpc.ServicerContext) -> DeviceIdList:
            return DeviceIdList(device_ids=device_list_ids(self.db_engine))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def ListDevices(self, request : Empty, context : grpc.ServicerContext) -> DeviceList:
            return DeviceList(devices=device_list_objs(self.db_engine))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def GetDevice(self, request : ContextId, context : grpc.ServicerContext) -> Device:
            return Device(**device_get(self.db_engine, request))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def SetDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
            device_id,updated = device_set(self.db_engine, request)
            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': device_id})
            return DeviceId(**device_id)
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def RemoveDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
            device_id,deleted = device_delete(self.db_engine, request)
            if deleted:
                event_type = EventTypeEnum.EVENTTYPE_REMOVE
                notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': device_id})
            return Empty()
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def GetDeviceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]:
            for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT):
                yield DeviceEvent(**json.loads(message.content))
    
    
        # ----- Link -------------------------------------------------------------------------------------------------------
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def ListLinkIds(self, request : Empty, context : grpc.ServicerContext) -> LinkIdList:
            return LinkIdList(link_ids=link_list_ids(self.db_engine))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def ListLinks(self, request : Empty, context : grpc.ServicerContext) -> LinkList:
            return LinkList(links=link_list_objs(self.db_engine))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def GetLink(self, request : LinkId, context : grpc.ServicerContext) -> Link:
            return Link(**link_get(self.db_engine, request))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def SetLink(self, request : Link, context : grpc.ServicerContext) -> LinkId:
            link_id,updated = link_set(self.db_engine, request)
            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': link_id})
            return LinkId(**link_id)
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def RemoveLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty:
            link_id,deleted = link_delete(self.db_engine, request)
            if deleted:
                event_type = EventTypeEnum.EVENTTYPE_REMOVE
                notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': link_id})
            return Empty()
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def GetLinkEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]:
            for message in self.messagebroker.consume({TOPIC_LINK}, consume_timeout=CONSUME_TIMEOUT):
                yield LinkEvent(**json.loads(message.content))
    
    
        # ----- Service ----------------------------------------------------------------------------------------------------
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def ListServiceIds(self, request : ContextId, context : grpc.ServicerContext) -> ServiceIdList:
            return ServiceIdList(service_ids=service_list_ids(self.db_engine, request))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def ListServices(self, request : ContextId, context : grpc.ServicerContext) -> ServiceList:
            return ServiceList(services=service_list_objs(self.db_engine, request))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def GetService(self, request : ServiceId, context : grpc.ServicerContext) -> Service:
            return Service(**service_get(self.db_engine, request))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def SetService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
            service_id,updated = service_set(self.db_engine, request)
            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': service_id})
            return ServiceId(**service_id)
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def RemoveService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty:
            service_id,deleted = service_delete(self.db_engine, request)
            if deleted:
                event_type = EventTypeEnum.EVENTTYPE_REMOVE
                notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': service_id})
            return Empty()
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def GetServiceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]:
            for message in self.messagebroker.consume({TOPIC_SERVICE}, consume_timeout=CONSUME_TIMEOUT):
                yield ServiceEvent(**json.loads(message.content))
    
    
        # ----- Slice ----------------------------------------------------------------------------------------------------
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def ListSliceIds(self, request : ContextId, context : grpc.ServicerContext) -> SliceIdList:
            return SliceIdList(slice_ids=slice_list_ids(self.db_engine, request))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def ListSlices(self, request : ContextId, context : grpc.ServicerContext) -> SliceList:
            return SliceList(slices=slice_list_objs(self.db_engine, request))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def GetSlice(self, request : SliceId, context : grpc.ServicerContext) -> Slice:
            return Slice(**slice_get(self.db_engine, request))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def SetSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
            slice_id,updated = slice_set(self.db_engine, request)
            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': slice_id})
            return SliceId(**slice_id)
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def UnsetSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
            slice_id,updated = slice_unset(self.db_engine, request)
            if updated:
                event_type = EventTypeEnum.EVENTTYPE_UPDATE
                notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': slice_id})
            return SliceId(**slice_id)
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def RemoveSlice(self, request : SliceId, context : grpc.ServicerContext) -> Empty:
            slice_id,deleted = slice_delete(self.db_engine, request)
            if deleted:
                event_type = EventTypeEnum.EVENTTYPE_REMOVE
                notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': slice_id})
            return Empty()
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def GetSliceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]:
            for message in self.messagebroker.consume({TOPIC_SLICE}, consume_timeout=CONSUME_TIMEOUT):
                yield SliceEvent(**json.loads(message.content))
    
    
        # ----- Connection -------------------------------------------------------------------------------------------------
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def ListConnectionIds(self, request : ServiceId, context : grpc.ServicerContext) -> ConnectionIdList:
            return ConnectionIdList(connection_ids=connection_list_ids(self.db_engine, request))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def ListConnections(self, request : ContextId, context : grpc.ServicerContext) -> ConnectionList:
            return ConnectionList(connections=connection_list_objs(self.db_engine, request))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def GetConnection(self, request : ConnectionId, context : grpc.ServicerContext) -> Connection:
            return Connection(**connection_get(self.db_engine, request))
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def SetConnection(self, request : Connection, context : grpc.ServicerContext) -> ConnectionId:
            connection_id,updated = connection_set(self.db_engine, request)
            event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
            notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': connection_id})
            return ConnectionId(**connection_id)
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def RemoveConnection(self, request : ConnectionId, context : grpc.ServicerContext) -> Empty:
            connection_id,deleted = connection_delete(self.db_engine, request)
            if deleted:
                event_type = EventTypeEnum.EVENTTYPE_REMOVE
                notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': connection_id})
            return Empty()
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def GetConnectionEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]:
            for message in self.messagebroker.consume({TOPIC_CONNECTION}, consume_timeout=CONSUME_TIMEOUT):
                yield ConnectionEvent(**json.loads(message.content))
    
    
        # ----- Policy -----------------------------------------------------------------------------------------------------
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def ListPolicyRuleIds(self, request : Empty, context: grpc.ServicerContext) -> PolicyRuleIdList:
            return policyrule_list_ids(self.db_engine)
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def ListPolicyRules(self, request : Empty, context: grpc.ServicerContext) -> PolicyRuleList:
            return policyrule_list_objs(self.db_engine)
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def GetPolicyRule(self, request : PolicyRuleId, context: grpc.ServicerContext) -> PolicyRule:
            return policyrule_get(self.db_engine, request)
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def SetPolicyRule(self, request : PolicyRule, context: grpc.ServicerContext) -> PolicyRuleId:
            policyrule_id,updated = policyrule_set(self.db_engine, request) # pylint: disable=unused-variable
            return policyrule_id
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def RemovePolicyRule(self, request : PolicyRuleId, context: grpc.ServicerContext) -> Empty:
            deleted = policyrule_delete(self.db_engine, request) # pylint: disable=unused-variable
            return Empty()