Skip to content
Snippets Groups Projects
Events.py 4.74 KiB
Newer Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import enum, json, logging, time
from typing import Dict, Iterator, Set
from common.message_broker.Message import Message
from common.message_broker.MessageBroker import MessageBroker
from common.proto.context_pb2 import (
    ConnectionEvent, ContextEvent, DeviceEvent, EventTypeEnum, LinkEvent,
    ServiceEvent, SliceEvent, TopologyEvent, OpticalConfigEvent
)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

class EventTopicEnum(enum.Enum):
    CONNECTION    = 'connection'
    CONTEXT       = 'context'
    DEVICE        = 'device'
    LINK          = 'link'
    POLICY_RULE   = 'policy-rule'
    SERVICE       = 'service'
    SLICE         = 'slice'
    TOPOLOGY      = 'topology'
    OPTICALCONFIG = 'optical-config'
  
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

TOPIC_TO_EVENTCLASS = {
    EventTopicEnum.CONNECTION.value    : ConnectionEvent,
    EventTopicEnum.CONTEXT.value       : ContextEvent,
    EventTopicEnum.DEVICE.value        : DeviceEvent,
    EventTopicEnum.LINK.value          : LinkEvent,
    #EventTopicEnum.POLICY_RULE.value   : PolicyRuleEvent,  # Not defined in proto files
    EventTopicEnum.SERVICE.value       : ServiceEvent,
    EventTopicEnum.SLICE.value         : SliceEvent,
    EventTopicEnum.TOPOLOGY.value      : TopologyEvent,
    EventTopicEnum.OPTICALCONFIG.value : OpticalConfigEvent,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
}

CONSUME_TIMEOUT = 0.5 # seconds

LOGGER = logging.getLogger(__name__)

def notify_event(
    messagebroker : MessageBroker, topic_enum : EventTopicEnum, event_type : EventTypeEnum, fields : Dict[str, str]
) -> None:
    event = {'event': {'timestamp': {'timestamp': time.time()}, 'event_type': event_type}}
    for field_name, field_value in fields.items():
        event[field_name] = field_value
    messagebroker.publish(Message(topic_enum.value, json.dumps(event)))

def notify_event_context(messagebroker : MessageBroker, event_type : EventTypeEnum, context_id : Dict) -> None:
    notify_event(messagebroker, EventTopicEnum.CONTEXT, event_type, {'context_id': context_id})

def notify_event_topology(messagebroker : MessageBroker, event_type : EventTypeEnum, topology_id : Dict) -> None:
    notify_event(messagebroker, EventTopicEnum.TOPOLOGY, event_type, {'topology_id': topology_id})

def notify_event_device(messagebroker : MessageBroker, event_type : EventTypeEnum, device_id : Dict) -> None:
    notify_event(messagebroker, EventTopicEnum.DEVICE, event_type, {'device_id': device_id})
def notify_event_opticalconfig(messagebroker : MessageBroker, event_type : EventTypeEnum, opticalconfig_id : Dict) -> None:
    notify_event(messagebroker, EventTopicEnum.DEVICE, event_type, {'opticalconfig_id': opticalconfig_id})
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

def notify_event_link(messagebroker : MessageBroker, event_type : EventTypeEnum, link_id : Dict) -> None:
    notify_event(messagebroker, EventTopicEnum.LINK, event_type, {'link_id': link_id})

def notify_event_service(messagebroker : MessageBroker, event_type : EventTypeEnum, service_id : Dict) -> None:
    notify_event(messagebroker, EventTopicEnum.SERVICE, event_type, {'service_id': service_id})

def notify_event_slice(messagebroker : MessageBroker, event_type : EventTypeEnum, slice_id : Dict) -> None:
    notify_event(messagebroker, EventTopicEnum.SLICE, event_type, {'slice_id': slice_id})

def notify_event_connection(messagebroker : MessageBroker, event_type : EventTypeEnum, connection_id : Dict) -> None:
    notify_event(messagebroker, EventTopicEnum.CONNECTION, event_type, {'connection_id': connection_id})

def notify_event_policy_rule(messagebroker : MessageBroker, event_type : EventTypeEnum, policyrule_id : Dict) -> None:
    notify_event(messagebroker, EventTopicEnum.POLICY_RULE, event_type, {'policyrule_id': policyrule_id})

def consume_events(
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    messagebroker : MessageBroker, topic_enums : Set[EventTopicEnum], consume_timeout : float = CONSUME_TIMEOUT
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
) -> Iterator:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    topic_names = [topic_enum.value for topic_enum in topic_enums]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    for message in messagebroker.consume(topic_names, consume_timeout=consume_timeout):
        event_class = TOPIC_TO_EVENTCLASS.get(message.topic)
        if event_class is None:
            MSG = 'No EventClass defined for Topic({:s}). Ignoring...'
            LOGGER.warning(MSG.format(str(message.topic)))
            continue
        yield event_class(**json.loads(message.content))