diff --git a/src/interdomain/service/topology_abstractor/DltRecorder.py b/src/interdomain/service/topology_abstractor/DltRecorder.py index cb8c194eb0f100b5fe11c523a4618cf41891213b..abcb382f622bd8e34d107442ba7f1a315cf8e205 100644 --- a/src/interdomain/service/topology_abstractor/DltRecorder.py +++ b/src/interdomain/service/topology_abstractor/DltRecorder.py @@ -30,6 +30,7 @@ class DLTRecorder(threading.Thread): self.terminate = threading.Event() self.context_client = ContextClient() self.context_event_collector = EventsCollector(self.context_client) + self.topology_cache = {} def stop(self): self.terminate.set() @@ -105,6 +106,8 @@ class DLTRecorder(threading.Thread): topology_details = self.context_client.GetTopologyDetails(topology_id) topology_name = topology_details.name + self.topology_cache[topology_uuid] = topology_details + if ((context_uuid == DEFAULT_CONTEXT_NAME) or (context_name == DEFAULT_CONTEXT_NAME)) and \ (topology_uuid not in topology_uuids) and (topology_name not in topology_uuids): for device in topology_details.devices: @@ -118,16 +121,34 @@ class DLTRecorder(threading.Thread): args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event) LOGGER.warning(MSG.format(*args)) + def find_topology_for_device(self, device_id: DeviceId) -> Optional[TopologyId]: + for topology_id, details in self.topology_cache.items(): + for device in details.devices: + if device.device_id == device_id: + return topology_id + return None -#Check which ID to use. + def find_topology_for_link(self, link_id: LinkId) -> Optional[TopologyId]: + for topology_id, details in self.topology_cache.items(): + for link in details.links: + if link.link_id == link_id: + return topology_id + return None def process_device_event(self, event: DeviceEvent, dlt_record_sender: DltRecordSender) -> None: device_id = event.device_id - device_uuid = device_id.device_uuid.uuid device = self.context_client.GetDevice(device_id) - dlt_record_sender.add_device(device_id.context_id, device) + topology_id = self.find_topology_for_device(device_id) + if topology_id: + dlt_record_sender.add_device(topology_id, device) + else: + LOGGER.warning(f"Topology not found for device {device_id.device_uuid.uuid}") def process_link_event(self, event: LinkEvent, dlt_record_sender: DltRecordSender) -> None: link_id = event.link_id link = self.context_client.GetLink(link_id) - dlt_record_sender.add_link(link_id.context_id, link) + topology_id = self.find_topology_for_link(link_id) + if topology_id: + dlt_record_sender.add_link(topology_id, link) + else: + LOGGER.warning(f"Topology not found for link {link_id.link_uuid.uuid}") diff --git a/src/interdomain/service/topology_abstractor/DltRecorderOld.py b/src/interdomain/service/topology_abstractor/DltRecorderOld.py new file mode 100644 index 0000000000000000000000000000000000000000..cb8c194eb0f100b5fe11c523a4618cf41891213b --- /dev/null +++ b/src/interdomain/service/topology_abstractor/DltRecorderOld.py @@ -0,0 +1,133 @@ +import logging +import threading +from typing import Dict, Optional + +from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum +from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_environment_variables, get_env_var_name +from common.proto.context_pb2 import ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPointId, Link, LinkEvent, TopologyId, TopologyEvent +from common.tools.context_queries.Context import create_context +from common.tools.context_queries.Device import get_uuids_of_devices_in_topology +from common.tools.context_queries.Topology import create_missing_topologies +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.tools.object_factory.Context import json_context_id +from common.tools.object_factory.Device import json_device_id +from common.tools.object_factory.Topology import json_topology_id +from context.client.ContextClient import ContextClient +from context.client.EventsCollector import EventsCollector +from dlt.connector.client.DltConnectorClient import DltConnectorClient +from .DltRecordSender import DltRecordSender +from .Types import EventTypes + +LOGGER = logging.getLogger(__name__) + +ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) +INTERDOMAIN_TOPOLOGY_ID = TopologyId(**json_topology_id(INTERDOMAIN_TOPOLOGY_NAME, context_id=ADMIN_CONTEXT_ID)) + + +class DLTRecorder(threading.Thread): + def __init__(self) -> None: + super().__init__(daemon=True) + self.terminate = threading.Event() + self.context_client = ContextClient() + self.context_event_collector = EventsCollector(self.context_client) + + def stop(self): + self.terminate.set() + + def run(self) -> None: + self.context_client.connect() + create_context(self.context_client, DEFAULT_CONTEXT_NAME) + self.create_topologies() + self.context_event_collector.start() + + while not self.terminate.is_set(): + event = self.context_event_collector.get_event(timeout=0.1) + if event is None: + continue + LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event))) + self.update_record(event) + + self.context_event_collector.stop() + self.context_client.close() + + def create_topologies(self): + topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME] + create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids) + + def get_dlt_connector_client(self) -> Optional[DltConnectorClient]: + env_vars = find_environment_variables([ + get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST), + get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + ]) + if len(env_vars) == 2: + dlt_connector_client = DltConnectorClient() + dlt_connector_client.connect() + return dlt_connector_client + return None + + def update_record(self, event: EventTypes) -> None: + dlt_connector_client = self.get_dlt_connector_client() + dlt_record_sender = DltRecordSender(self.context_client, dlt_connector_client) + + if isinstance(event, ContextEvent): + LOGGER.debug('Processing ContextEvent({:s})'.format(grpc_message_to_json_string(event))) + LOGGER.warning('Ignoring ContextEvent({:s})'.format(grpc_message_to_json_string(event))) + + elif isinstance(event, TopologyEvent): + LOGGER.debug('Processing TopologyEvent({:s})'.format(grpc_message_to_json_string(event))) + self.process_topology_event(event, dlt_record_sender) + + elif isinstance(event, DeviceEvent): + LOGGER.debug('Processing DeviceEvent({:s})'.format(grpc_message_to_json_string(event))) + self.process_device_event(event, dlt_record_sender) + + elif isinstance(event, LinkEvent): + LOGGER.debug('Processing LinkEvent({:s})'.format(grpc_message_to_json_string(event))) + self.process_link_event(event, dlt_record_sender) + + else: + LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event))) + + dlt_record_sender.commit() + if dlt_connector_client is not None: + dlt_connector_client.close() + + def process_topology_event(self, event: TopologyEvent, dlt_record_sender: DltRecordSender) -> None: + topology_id = event.topology_id + topology_uuid = topology_id.topology_uuid.uuid + context_id = topology_id.context_id + context_uuid = context_id.context_uuid.uuid + topology_uuids = {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME} + + context = self.context_client.GetContext(context_id) + context_name = context.name + + topology_details = self.context_client.GetTopologyDetails(topology_id) + topology_name = topology_details.name + + if ((context_uuid == DEFAULT_CONTEXT_NAME) or (context_name == DEFAULT_CONTEXT_NAME)) and \ + (topology_uuid not in topology_uuids) and (topology_name not in topology_uuids): + for device in topology_details.devices: + dlt_record_sender.add_device(topology_id, device) + + for link in topology_details.links: + dlt_record_sender.add_link(topology_id, link) + + else: + MSG = 'Ignoring ({:s}/{:s})({:s}/{:s}) TopologyEvent({:s})' + args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event) + LOGGER.warning(MSG.format(*args)) + + +#Check which ID to use. + + def process_device_event(self, event: DeviceEvent, dlt_record_sender: DltRecordSender) -> None: + device_id = event.device_id + device_uuid = device_id.device_uuid.uuid + device = self.context_client.GetDevice(device_id) + dlt_record_sender.add_device(device_id.context_id, device) + + def process_link_event(self, event: LinkEvent, dlt_record_sender: DltRecordSender) -> None: + link_id = event.link_id + link = self.context_client.GetLink(link_id) + dlt_record_sender.add_link(link_id.context_id, link)