Commit e94c55a8 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Context component:

- Moved Events to database sub-module
- Improved API for notification publication and consumption
- Moved construction of gRPC reply objects to database sub-module
- Moved generation of context change notifications to database sub-modules
- Minor cosmetic improvements
parent 38233dfc
Loading
Loading
Loading
Loading
+62 −133
Original line number Diff line number Diff line
@@ -12,14 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc, json, logging, sqlalchemy
import grpc, logging, sqlalchemy
from typing import Iterator
from common.message_broker.MessageBroker import MessageBroker
from common.proto.context_pb2 import (
    Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList,
    Context, ContextEvent, ContextId, ContextIdList, ContextList,
    Device, DeviceEvent, DeviceFilter, DeviceId, DeviceIdList, DeviceList,
    Empty, EndPointIdList, EndPointNameList, EventTypeEnum,
    Empty, EndPointIdList, EndPointNameList,
    Link, LinkEvent, LinkId, LinkIdList, LinkList,
    Service, ServiceEvent, ServiceFilter, ServiceId, ServiceIdList, ServiceList,
    Slice, SliceEvent, SliceFilter, SliceId, SliceIdList, SliceList,
@@ -33,16 +33,16 @@ from .database.Connection import (
from .database.Context import context_delete, context_get, context_list_ids, context_list_objs, context_set
from .database.Device import device_delete, device_get, device_list_ids, device_list_objs, device_select, device_set
from .database.EndPoint import endpoint_list_names
from .database.Events import EventTopicEnum, consume_events
from .database.Link import link_delete, link_get, link_list_ids, link_list_objs, link_set
from .database.PolicyRule import (
    policyrule_delete, policyrule_get, policyrule_list_ids, policyrule_list_objs, policyrule_set)
from .database.Service import service_delete, service_get, service_list_ids, service_list_objs, service_select, service_set, service_unset
from .database.Slice import slice_delete, slice_get, slice_list_ids, slice_list_objs, slice_select, slice_set, slice_unset
from .database.Service import (
    service_delete, service_get, service_list_ids, service_list_objs, service_select, service_set, service_unset)
from .database.Slice import (
    slice_delete, slice_get, slice_list_ids, slice_list_objs, slice_select, slice_set, slice_unset)
from .database.Topology import (
    topology_delete, topology_get, topology_get_details, topology_list_ids, topology_list_objs, topology_set)
from .Events import (
    CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_POLICY, TOPIC_SERVICE,
    TOPIC_SLICE, TOPIC_TOPOLOGY, notify_event)

LOGGER = logging.getLogger(__name__)

@@ -62,308 +62,237 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListContextIds(self, request : Empty, context : grpc.ServicerContext) -> ContextIdList:
        return ContextIdList(context_ids=context_list_ids(self.db_engine))
        return context_list_ids(self.db_engine)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListContexts(self, request : Empty, context : grpc.ServicerContext) -> ContextList:
        return ContextList(contexts=context_list_objs(self.db_engine))
        return context_list_objs(self.db_engine)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetContext(self, request : ContextId, context : grpc.ServicerContext) -> Context:
        return Context(**context_get(self.db_engine, request))
        return context_get(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SetContext(self, request : Context, context : grpc.ServicerContext) -> ContextId:
        context_id,updated = context_set(self.db_engine, request)
        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': context_id})
        return ContextId(**context_id)
        return context_set(self.db_engine, self.messagebroker, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def RemoveContext(self, request : ContextId, context : grpc.ServicerContext) -> Empty:
        context_id,deleted = context_delete(self.db_engine, request)
        if deleted:
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': context_id})
        return Empty()
        return context_delete(self.db_engine, self.messagebroker, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetContextEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
        for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT):
            yield ContextEvent(**json.loads(message.content))
        for message in consume_events(self.messagebroker, {EventTopicEnum.CONTEXT}): yield message


    # ----- Topology ---------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListTopologyIds(self, request : ContextId, context : grpc.ServicerContext) -> TopologyIdList:
        return TopologyIdList(topology_ids=topology_list_ids(self.db_engine, request))
        return topology_list_ids(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListTopologies(self, request : ContextId, context : grpc.ServicerContext) -> TopologyList:
        return TopologyList(topologies=topology_list_objs(self.db_engine, request))
        return topology_list_objs(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetTopology(self, request : TopologyId, context : grpc.ServicerContext) -> Topology:
        return Topology(**topology_get(self.db_engine, request))
        return topology_get(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetTopologyDetails(self, request : TopologyId, context : grpc.ServicerContext) -> TopologyDetails:
        return TopologyDetails(**topology_get_details(self.db_engine, request))
        return topology_get_details(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SetTopology(self, request : Topology, context : grpc.ServicerContext) -> TopologyId:
        topology_id,updated = topology_set(self.db_engine, request)
        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': topology_id})
        return TopologyId(**topology_id)
        return topology_set(self.db_engine, self.messagebroker, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def RemoveTopology(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
        topology_id,deleted = topology_delete(self.db_engine, request)
        if deleted:
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': topology_id})
        return Empty()
        return topology_delete(self.db_engine, self.messagebroker, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetTopologyEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]:
        for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT):
            yield TopologyEvent(**json.loads(message.content))
        for message in consume_events(self.messagebroker, {EventTopicEnum.TOPOLOGY}): yield message


    # ----- Device -----------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListDeviceIds(self, request : Empty, context : grpc.ServicerContext) -> DeviceIdList:
        return DeviceIdList(device_ids=device_list_ids(self.db_engine))
        return device_list_ids(self.db_engine)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListDevices(self, request : Empty, context : grpc.ServicerContext) -> DeviceList:
        return DeviceList(devices=device_list_objs(self.db_engine))
        return device_list_objs(self.db_engine)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetDevice(self, request : ContextId, context : grpc.ServicerContext) -> Device:
        return Device(**device_get(self.db_engine, request))
        return device_get(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SetDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
        device_id,updated = device_set(self.db_engine, request)
        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': device_id})
        return DeviceId(**device_id)
        return device_set(self.db_engine, self.messagebroker, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def RemoveDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
        device_id,deleted = device_delete(self.db_engine, request)
        if deleted:
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': device_id})
        return Empty()
        return device_delete(self.db_engine, self.messagebroker, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SelectDevice(self, request : DeviceFilter, context : grpc.ServicerContext) -> DeviceList:
        return DeviceList(devices=device_select(self.db_engine, request))
        return device_select(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetDeviceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]:
        for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT):
            yield DeviceEvent(**json.loads(message.content))
        for message in consume_events(self.messagebroker, {EventTopicEnum.DEVICE}): yield message

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListEndPointNames(self, request : EndPointIdList, context : grpc.ServicerContext) -> EndPointNameList:
        return EndPointNameList(endpoint_names=endpoint_list_names(self.db_engine, request))
        return endpoint_list_names(self.db_engine, request)


    # ----- Link -------------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListLinkIds(self, request : Empty, context : grpc.ServicerContext) -> LinkIdList:
        return LinkIdList(link_ids=link_list_ids(self.db_engine))
        return link_list_ids(self.db_engine)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListLinks(self, request : Empty, context : grpc.ServicerContext) -> LinkList:
        return LinkList(links=link_list_objs(self.db_engine))
        return link_list_objs(self.db_engine)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetLink(self, request : LinkId, context : grpc.ServicerContext) -> Link:
        return Link(**link_get(self.db_engine, request))
        return link_get(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SetLink(self, request : Link, context : grpc.ServicerContext) -> LinkId:
        link_id,updated = link_set(self.db_engine, request)
        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': link_id})
        return LinkId(**link_id)
        return link_set(self.db_engine, self.messagebroker, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def RemoveLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty:
        link_id,deleted = link_delete(self.db_engine, request)
        if deleted:
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': link_id})
        return Empty()
        return link_delete(self.db_engine, self.messagebroker, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetLinkEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]:
        for message in self.messagebroker.consume({TOPIC_LINK}, consume_timeout=CONSUME_TIMEOUT):
            yield LinkEvent(**json.loads(message.content))
        for message in consume_events(self.messagebroker, {EventTopicEnum.LINK}): yield message


    # ----- Service ----------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListServiceIds(self, request : ContextId, context : grpc.ServicerContext) -> ServiceIdList:
        return ServiceIdList(service_ids=service_list_ids(self.db_engine, request))
        return service_list_ids(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListServices(self, request : ContextId, context : grpc.ServicerContext) -> ServiceList:
        return ServiceList(services=service_list_objs(self.db_engine, request))
        return service_list_objs(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetService(self, request : ServiceId, context : grpc.ServicerContext) -> Service:
        return Service(**service_get(self.db_engine, request))
        return service_get(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SetService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
        service_id,updated = service_set(self.db_engine, request)
        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': service_id})
        return ServiceId(**service_id)
        return service_set(self.db_engine, self.messagebroker, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def UnsetService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
        service_id,updated = service_unset(self.db_engine, request)
        if updated:
            event_type = EventTypeEnum.EVENTTYPE_UPDATE
            notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': service_id})
        return ServiceId(**service_id)
        return service_unset(self.db_engine, self.messagebroker, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def RemoveService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty:
        service_id,deleted = service_delete(self.db_engine, request)
        if deleted:
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': service_id})
        return Empty()
        return service_delete(self.db_engine, self.messagebroker, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SelectService(self, request : ServiceFilter, context : grpc.ServicerContext) -> ServiceList:
        return ServiceList(services=service_select(self.db_engine, request))
        return service_select(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetServiceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]:
        for message in self.messagebroker.consume({TOPIC_SERVICE}, consume_timeout=CONSUME_TIMEOUT):
            yield ServiceEvent(**json.loads(message.content))
        for message in consume_events(self.messagebroker, {EventTopicEnum.SERVICE}): yield message


    # ----- Slice ----------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListSliceIds(self, request : ContextId, context : grpc.ServicerContext) -> SliceIdList:
        return SliceIdList(slice_ids=slice_list_ids(self.db_engine, request))
        return slice_list_ids(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListSlices(self, request : ContextId, context : grpc.ServicerContext) -> SliceList:
        return SliceList(slices=slice_list_objs(self.db_engine, request))
        return slice_list_objs(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetSlice(self, request : SliceId, context : grpc.ServicerContext) -> Slice:
        return Slice(**slice_get(self.db_engine, request))
        return slice_get(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SetSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
        slice_id,updated = slice_set(self.db_engine, request)
        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': slice_id})
        return SliceId(**slice_id)
        return slice_set(self.db_engine, self.messagebroker, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def UnsetSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
        slice_id,updated = slice_unset(self.db_engine, request)
        if updated:
            event_type = EventTypeEnum.EVENTTYPE_UPDATE
            notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': slice_id})
        return SliceId(**slice_id)
        return slice_unset(self.db_engine, self.messagebroker, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def RemoveSlice(self, request : SliceId, context : grpc.ServicerContext) -> Empty:
        slice_id,deleted = slice_delete(self.db_engine, request)
        if deleted:
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_SLICE, event_type, {'slice_id': slice_id})
        return Empty()
        return slice_delete(self.db_engine, self.messagebroker, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SelectSlice(self, request : SliceFilter, context : grpc.ServicerContext) -> SliceList:
        return SliceList(slices=slice_select(self.db_engine, request))
        return slice_select(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetSliceEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]:
        for message in self.messagebroker.consume({TOPIC_SLICE}, consume_timeout=CONSUME_TIMEOUT):
            yield SliceEvent(**json.loads(message.content))
        for message in consume_events(self.messagebroker, {EventTopicEnum.SLICE}): yield message


    # ----- Connection -------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListConnectionIds(self, request : ServiceId, context : grpc.ServicerContext) -> ConnectionIdList:
        return ConnectionIdList(connection_ids=connection_list_ids(self.db_engine, request))
        return connection_list_ids(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListConnections(self, request : ContextId, context : grpc.ServicerContext) -> ConnectionList:
        return ConnectionList(connections=connection_list_objs(self.db_engine, request))
        return connection_list_objs(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetConnection(self, request : ConnectionId, context : grpc.ServicerContext) -> Connection:
        return Connection(**connection_get(self.db_engine, request))
        return connection_get(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SetConnection(self, request : Connection, context : grpc.ServicerContext) -> ConnectionId:
        connection_id,updated = connection_set(self.db_engine, request)
        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': connection_id})
        return ConnectionId(**connection_id)
        return connection_set(self.db_engine, self.messagebroker, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def RemoveConnection(self, request : ConnectionId, context : grpc.ServicerContext) -> Empty:
        connection_id,deleted = connection_delete(self.db_engine, request)
        if deleted:
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': connection_id})
        return Empty()
        return connection_delete(self.db_engine, self.messagebroker, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetConnectionEvents(self, request : Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]:
        for message in self.messagebroker.consume({TOPIC_CONNECTION}, consume_timeout=CONSUME_TIMEOUT):
            yield ConnectionEvent(**json.loads(message.content))
        for message in consume_events(self.messagebroker, {EventTopicEnum.CONNECTION}): yield message


    # ----- Policy -----------------------------------------------------------------------------------------------------
    # ----- Policy Rule ------------------------------------------------------------------------------------------------

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListPolicyRuleIds(self, request : Empty, context: grpc.ServicerContext) -> PolicyRuleIdList:
        return PolicyRuleIdList(policyRuleIdList=policyrule_list_ids(self.db_engine))
        return policyrule_list_ids(self.db_engine)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListPolicyRules(self, request : Empty, context: grpc.ServicerContext) -> PolicyRuleList:
        return PolicyRuleList(policyRules=policyrule_list_objs(self.db_engine))
        return policyrule_list_objs(self.db_engine)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetPolicyRule(self, request : PolicyRuleId, context: grpc.ServicerContext) -> PolicyRule:
        return PolicyRule(**policyrule_get(self.db_engine, request))
        return policyrule_get(self.db_engine, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SetPolicyRule(self, request : PolicyRule, context: grpc.ServicerContext) -> PolicyRuleId:
        policyrule_id,updated = policyrule_set(self.db_engine, request)
        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        notify_event(self.messagebroker, TOPIC_POLICY, event_type, {'policyrule_id': policyrule_id})
        return PolicyRuleId(**policyrule_id)
        return policyrule_set(self.db_engine, self.messagebroker, request)

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def RemovePolicyRule(self, request : PolicyRuleId, context: grpc.ServicerContext) -> Empty:
        policyrule_id,deleted = policyrule_delete(self.db_engine, request)
        if deleted:
            event_type = EventTypeEnum.EVENTTYPE_REMOVE
            notify_event(self.messagebroker, TOPIC_POLICY, event_type, {'policyrule_id': policyrule_id})
        return Empty()
        return policyrule_delete(self.db_engine, self.messagebroker, request)

src/context/service/Events.py

deleted100644 → 0
+0 −42
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json, time
from typing import Dict
from common.message_broker.Message import Message
from common.message_broker.MessageBroker import MessageBroker
from common.proto.context_pb2 import EventTypeEnum

TOPIC_CONNECTION = 'connection'
TOPIC_CONTEXT    = 'context'
TOPIC_DEVICE     = 'device'
TOPIC_LINK       = 'link'
TOPIC_POLICY     = 'policy'
TOPIC_SERVICE    = 'service'
TOPIC_SLICE      = 'slice'
TOPIC_TOPOLOGY   = 'topology'

TOPICS = {
    TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_POLICY, TOPIC_SERVICE, TOPIC_SLICE, TOPIC_TOPOLOGY
}

CONSUME_TIMEOUT = 0.5 # seconds

def notify_event(
    messagebroker : MessageBroker, topic_name : str, event_type : EventTypeEnum, fields : Dict[str, str]
) -> None:
    event = {'event': {'timestamp': {'timestamp': time.time()}, 'event_type': event_type}}
    for field_name, field_value in fields.items():
        event[field_name] = field_value
    messagebroker.publish(Message(topic_name, json.dumps(event)))
+22 −11

File changed.

Preview size limit exceeded, changes collapsed.

+22 −12

File changed.

Preview size limit exceeded, changes collapsed.

+26 −14

File changed.

Preview size limit exceeded, changes collapsed.

Loading