diff --git a/src/interdomain/service/__main__.py b/src/interdomain/service/__main__.py index 8c392821ef9720b04c5eaa160901bd6e1ecd7e5e..3f5ccd18385d47eedbf649235d6fb095c9e6657a 100644 --- a/src/interdomain/service/__main__.py +++ b/src/interdomain/service/__main__.py @@ -13,6 +13,8 @@ # limitations under the License. import logging, signal, sys, threading +import asyncio + from prometheus_client import start_http_server from common.Constants import ServiceNameEnum from common.Settings import ( @@ -27,6 +29,12 @@ from .RemoteDomainClients import RemoteDomainClients terminate = threading.Event() LOGGER : logging.Logger = None + +async def run_dlt_recorder(dlt_recorder): + await dlt_recorder.start() + await terminate.wait() + await dlt_recorder.stop() + def signal_handler(signal, frame): # pylint: disable=redefined-outer-name LOGGER.warning('Terminate signal received') terminate.set() @@ -76,7 +84,9 @@ def main(): if dlt_enabled: LOGGER.info('Starting DLT functionality...') dlt_recorder = DLTRecorder() - dlt_recorder.start() + #dlt_recorder.start() + loop = asyncio.get_event_loop() + loop.run_until_complete(run_dlt_recorder(dlt_recorder)) # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=1.0): pass @@ -85,7 +95,9 @@ def main(): # if topology_abstractor_enabled: # topology_abstractor.stop() if dlt_enabled: - dlt_recorder.stop() + #dlt_recorder.stop() + loop.run_until_complete(dlt_recorder.stop()) + loop.close() grpc_service.stop() remote_domain_clients.stop() diff --git a/src/interdomain/service/topology_abstractor/DltRecorder copy.py b/src/interdomain/service/topology_abstractor/DltRecorder copy.py new file mode 100644 index 0000000000000000000000000000000000000000..0e94159c4c9f2103941c6bdad4cdfd18e7cc2419 --- /dev/null +++ b/src/interdomain/service/topology_abstractor/DltRecorder copy.py @@ -0,0 +1,166 @@ +import logging +import threading +from typing import Dict, Optional + +from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum +from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_environment_variables, get_env_var_name +from common.proto.context_pb2 import ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPointId, Link, LinkId, LinkEvent, TopologyId, TopologyEvent +from common.tools.context_queries.Context import create_context +from common.tools.context_queries.Device import get_uuids_of_devices_in_topology +from common.tools.context_queries.Topology import create_missing_topologies +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.tools.object_factory.Context import json_context_id +from common.tools.object_factory.Device import json_device_id +from common.tools.object_factory.Topology import json_topology_id +from context.client.ContextClient import ContextClient +from context.client.EventsCollector import EventsCollector +from dlt.connector.client.DltConnectorClient import DltConnectorClient +from .DltRecordSender import DltRecordSender +from .Types import EventTypes + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + +ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) +INTERDOMAIN_TOPOLOGY_ID = TopologyId(**json_topology_id(INTERDOMAIN_TOPOLOGY_NAME, context_id=ADMIN_CONTEXT_ID)) + + +class DLTRecorder(threading.Thread): + def __init__(self) -> None: + super().__init__(daemon=True) + self.terminate = threading.Event() + self.context_client = ContextClient() + self.context_event_collector = EventsCollector(self.context_client) + self.topology_cache: Dict[str, TopologyId] = {} + + def stop(self): + self.terminate.set() + + def run(self) -> None: + self.context_client.connect() + create_context(self.context_client, DEFAULT_CONTEXT_NAME) + self.create_topologies() + self.context_event_collector.start() + + while not self.terminate.is_set(): + event = self.context_event_collector.get_event(timeout=0.1) + if event is None: + continue + LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event))) + self.update_record(event) + + self.context_event_collector.stop() + self.context_client.close() + + def create_topologies(self): + topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME] + create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids) + + def get_dlt_connector_client(self) -> Optional[DltConnectorClient]: + # Always enable DLT for testing + dlt_connector_client = DltConnectorClient() + dlt_connector_client.connect() + return dlt_connector_client + # env_vars = find_environment_variables([ + # get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST), + # get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + # ]) + # if len(env_vars) == 2: + # dlt_connector_client = DltConnectorClient() + # dlt_connector_client.connect() + # return dlt_connector_client + # return None + + def update_record(self, event: EventTypes) -> None: + dlt_connector_client = self.get_dlt_connector_client() + dlt_record_sender = DltRecordSender(self.context_client, dlt_connector_client) + + if isinstance(event, ContextEvent): + LOGGER.debug('Processing ContextEvent({:s})'.format(grpc_message_to_json_string(event))) + LOGGER.warning('Ignoring ContextEvent({:s})'.format(grpc_message_to_json_string(event))) + + elif isinstance(event, TopologyEvent): + LOGGER.debug('Processing TopologyEvent({:s})'.format(grpc_message_to_json_string(event))) + self.process_topology_event(event, dlt_record_sender) + + elif isinstance(event, DeviceEvent): + LOGGER.debug('Processing DeviceEvent({:s})'.format(grpc_message_to_json_string(event))) + self.process_device_event(event, dlt_record_sender) + + elif isinstance(event, LinkEvent): + LOGGER.debug('Processing LinkEvent({:s})'.format(grpc_message_to_json_string(event))) + self.process_link_event(event, dlt_record_sender) + + else: + LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event))) + + dlt_record_sender.commit() + if dlt_connector_client is not None: + dlt_connector_client.close() + + def process_topology_event(self, event: TopologyEvent, dlt_record_sender: DltRecordSender) -> None: + topology_id = event.topology_id + topology_uuid = topology_id.topology_uuid.uuid + context_id = topology_id.context_id + context_uuid = context_id.context_uuid.uuid + topology_uuids = {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME} + + context = self.context_client.GetContext(context_id) + context_name = context.name + + topology_details = self.context_client.GetTopologyDetails(topology_id) + topology_name = topology_details.name + + self.topology_cache[topology_uuid] = topology_id + + LOGGER.debug('TOPOLOGY Details({:s})'.format(grpc_message_to_json_string(topology_details))) + + if ((context_uuid == DEFAULT_CONTEXT_NAME) or (context_name == DEFAULT_CONTEXT_NAME)) and \ + (topology_uuid not in topology_uuids) and (topology_name not in topology_uuids): + for device in topology_details.devices: + LOGGER.debug('DEVICE_INFO_TOPO({:s})'.format(grpc_message_to_json_string(device))) + dlt_record_sender.add_device(topology_id, device) + + for link in topology_details.links: + dlt_record_sender.add_link(topology_id, link) + + else: + MSG = 'Ignoring ({:s}/{:s})({:s}/{:s}) TopologyEvent({:s})' + args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event) + LOGGER.warning(MSG.format(*args)) + + def find_topology_for_device(self, device_id: DeviceId) -> Optional[TopologyId]: + for topology_uuid, topology_id in self.topology_cache.items(): + details = self.context_client.GetTopologyDetails(topology_id) + for device in details.devices: + if device.device_id == device_id: + return topology_id + return None + + def find_topology_for_link(self, link_id: LinkId) -> Optional[TopologyId]: + for topology_uuid, topology_id in self.topology_cache.items(): + details = self.context_client.GetTopologyDetails(topology_id) + for link in details.links: + if link.link_id == link_id: + return topology_id + return None + + def process_device_event(self, event: DeviceEvent, dlt_record_sender: DltRecordSender) -> None: + device_id = event.device_id + device = self.context_client.GetDevice(device_id) + topology_id = self.find_topology_for_device(device_id) + if topology_id: + LOGGER.debug('DEVICE_INFO({:s}), DEVICE_ID ({:s})'.format(str(device.device_id.device_uuid.uuid), grpc_message_to_json_string(device_id))) + + dlt_record_sender.add_device(topology_id, device) + else: + LOGGER.warning(f"Topology not found for device {device_id.device_uuid.uuid}") + + def process_link_event(self, event: LinkEvent, dlt_record_sender: DltRecordSender) -> None: + link_id = event.link_id + link = self.context_client.GetLink(link_id) + topology_id = self.find_topology_for_link(link_id) + if topology_id: + dlt_record_sender.add_link(topology_id, link) + else: + LOGGER.warning(f"Topology not found for link {link_id.link_uuid.uuid}") diff --git a/src/interdomain/service/topology_abstractor/DltRecorder.py b/src/interdomain/service/topology_abstractor/DltRecorder.py index 0e94159c4c9f2103941c6bdad4cdfd18e7cc2419..96c97f5a2bb0a242840ed0dd5df65bd562adf938 100644 --- a/src/interdomain/service/topology_abstractor/DltRecorder.py +++ b/src/interdomain/service/topology_abstractor/DltRecorder.py @@ -1,10 +1,12 @@ +# DltRecorder.py + import logging -import threading +import asyncio from typing import Dict, Optional from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_environment_variables, get_env_var_name -from common.proto.context_pb2 import ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPointId, Link, LinkId, LinkEvent, TopologyId, TopologyEvent +from common.proto.context_pb2 import ContextEvent, ContextId, Device, DeviceEvent, EndPointId, Link, LinkId, LinkEvent, TopologyId, TopologyEvent from common.tools.context_queries.Context import create_context from common.tools.context_queries.Device import get_uuids_of_devices_in_topology from common.tools.context_queries.Topology import create_missing_topologies @@ -25,54 +27,47 @@ ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) INTERDOMAIN_TOPOLOGY_ID = TopologyId(**json_topology_id(INTERDOMAIN_TOPOLOGY_NAME, context_id=ADMIN_CONTEXT_ID)) -class DLTRecorder(threading.Thread): +class DLTRecorder: def __init__(self) -> None: - super().__init__(daemon=True) - self.terminate = threading.Event() + self.terminate_event = asyncio.Event() self.context_client = ContextClient() self.context_event_collector = EventsCollector(self.context_client) self.topology_cache: Dict[str, TopologyId] = {} - def stop(self): - self.terminate.set() + async def stop(self): + self.terminate_event.set() + + async def start(self) -> None: + await self.run() - def run(self) -> None: - self.context_client.connect() + async def run(self) -> None: + await self.context_client.connect() create_context(self.context_client, DEFAULT_CONTEXT_NAME) self.create_topologies() - self.context_event_collector.start() + await self.context_event_collector.start() - while not self.terminate.is_set(): - event = self.context_event_collector.get_event(timeout=0.1) + while not self.terminate_event.is_set(): + event = await self.context_event_collector.get_event(timeout=0.1) if event is None: continue LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event))) - self.update_record(event) + await self.update_record(event) - self.context_event_collector.stop() - self.context_client.close() + await self.context_event_collector.stop() + await self.context_client.close() def create_topologies(self): topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME] create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids) - def get_dlt_connector_client(self) -> Optional[DltConnectorClient]: + async def get_dlt_connector_client(self) -> Optional[DltConnectorClient]: # Always enable DLT for testing dlt_connector_client = DltConnectorClient() - dlt_connector_client.connect() + await dlt_connector_client.connect() return dlt_connector_client - # env_vars = find_environment_variables([ - # get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST), - # get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), - # ]) - # if len(env_vars) == 2: - # dlt_connector_client = DltConnectorClient() - # dlt_connector_client.connect() - # return dlt_connector_client - # return None - - def update_record(self, event: EventTypes) -> None: - dlt_connector_client = self.get_dlt_connector_client() + + async def update_record(self, event: EventTypes) -> None: + dlt_connector_client = await self.get_dlt_connector_client() dlt_record_sender = DltRecordSender(self.context_client, dlt_connector_client) if isinstance(event, ContextEvent): @@ -81,34 +76,34 @@ class DLTRecorder(threading.Thread): elif isinstance(event, TopologyEvent): LOGGER.debug('Processing TopologyEvent({:s})'.format(grpc_message_to_json_string(event))) - self.process_topology_event(event, dlt_record_sender) + await self.process_topology_event(event, dlt_record_sender) elif isinstance(event, DeviceEvent): LOGGER.debug('Processing DeviceEvent({:s})'.format(grpc_message_to_json_string(event))) - self.process_device_event(event, dlt_record_sender) + await self.process_device_event(event, dlt_record_sender) elif isinstance(event, LinkEvent): LOGGER.debug('Processing LinkEvent({:s})'.format(grpc_message_to_json_string(event))) - self.process_link_event(event, dlt_record_sender) + await self.process_link_event(event, dlt_record_sender) else: LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event))) - dlt_record_sender.commit() + await dlt_record_sender.commit() if dlt_connector_client is not None: - dlt_connector_client.close() + await dlt_connector_client.close() - def process_topology_event(self, event: TopologyEvent, dlt_record_sender: DltRecordSender) -> None: + async def process_topology_event(self, event: TopologyEvent, dlt_record_sender: DltRecordSender) -> None: topology_id = event.topology_id topology_uuid = topology_id.topology_uuid.uuid context_id = topology_id.context_id context_uuid = context_id.context_uuid.uuid topology_uuids = {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME} - context = self.context_client.GetContext(context_id) + context = await self.context_client.GetContext(context_id) context_name = context.name - topology_details = self.context_client.GetTopologyDetails(topology_id) + topology_details = await self.context_client.GetTopologyDetails(topology_id) topology_name = topology_details.name self.topology_cache[topology_uuid] = topology_id @@ -129,26 +124,26 @@ class DLTRecorder(threading.Thread): args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event) LOGGER.warning(MSG.format(*args)) - def find_topology_for_device(self, device_id: DeviceId) -> Optional[TopologyId]: + async def find_topology_for_device(self, device_id: DeviceId) -> Optional[TopologyId]: for topology_uuid, topology_id in self.topology_cache.items(): - details = self.context_client.GetTopologyDetails(topology_id) + details = await self.context_client.GetTopologyDetails(topology_id) for device in details.devices: if device.device_id == device_id: return topology_id return None - def find_topology_for_link(self, link_id: LinkId) -> Optional[TopologyId]: + async def find_topology_for_link(self, link_id: LinkId) -> Optional[TopologyId]: for topology_uuid, topology_id in self.topology_cache.items(): - details = self.context_client.GetTopologyDetails(topology_id) + details = await self.context_client.GetTopologyDetails(topology_id) for link in details.links: if link.link_id == link_id: return topology_id return None - def process_device_event(self, event: DeviceEvent, dlt_record_sender: DltRecordSender) -> None: + async def process_device_event(self, event: DeviceEvent, dlt_record_sender: DltRecordSender) -> None: device_id = event.device_id - device = self.context_client.GetDevice(device_id) - topology_id = self.find_topology_for_device(device_id) + device = await self.context_client.GetDevice(device_id) + topology_id = await self.find_topology_for_device(device_id) if topology_id: LOGGER.debug('DEVICE_INFO({:s}), DEVICE_ID ({:s})'.format(str(device.device_id.device_uuid.uuid), grpc_message_to_json_string(device_id))) @@ -156,10 +151,10 @@ class DLTRecorder(threading.Thread): else: LOGGER.warning(f"Topology not found for device {device_id.device_uuid.uuid}") - def process_link_event(self, event: LinkEvent, dlt_record_sender: DltRecordSender) -> None: + async def process_link_event(self, event: LinkEvent, dlt_record_sender: DltRecordSender) -> None: link_id = event.link_id - link = self.context_client.GetLink(link_id) - topology_id = self.find_topology_for_link(link_id) + link = await self.context_client.GetLink(link_id) + topology_id = await self.find_topology_for_link(link_id) if topology_id: dlt_record_sender.add_link(topology_id, link) else: