diff --git a/my_deploy.sh b/my_deploy.sh index d18c5db7fd502276cbeaea859a421673c77d6146..c0f2196c8db4b73c65fdbf31e9d6589f3cbf950b 100755 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -20,7 +20,7 @@ export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/" # Set the list of components, separated by spaces, you want to build images for, and deploy. -export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_generator dlt" +export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_generator interdomain dlt" # Uncomment to activate Monitoring #export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring" diff --git a/src/interdomain/Config.py b/src/interdomain/Config.py index 918f60d79d51ee8f9fe3875b630de2c373c01a5a..d9098447bb107a878acf297d66eaf1573d72691d 100644 --- a/src/interdomain/Config.py +++ b/src/interdomain/Config.py @@ -15,6 +15,7 @@ from common.Settings import get_setting SETTING_NAME_TOPOLOGY_ABSTRACTOR = 'TOPOLOGY_ABSTRACTOR' +SETTING_NAME_DLT_INTEGRATION = 'DLT_INTEGRATION' TRUE_VALUES = {'Y', 'YES', 'TRUE', 'T', 'E', 'ENABLE', 'ENABLED'} def is_topology_abstractor_enabled() -> bool: @@ -22,3 +23,9 @@ def is_topology_abstractor_enabled() -> bool: if is_enabled is None: return False str_is_enabled = str(is_enabled).upper() return str_is_enabled in TRUE_VALUES + +def is_dlt_enabled() -> bool: + is_enabled = get_setting(SETTING_NAME_DLT_INTEGRATION, default=None) + if is_enabled is None: return False + str_is_enabled = str(is_enabled).upper() + return str_is_enabled in TRUE_VALUES diff --git a/src/interdomain/service/__main__.py b/src/interdomain/service/__main__.py index c0497bd2902080f8b967ba50ce370a4c9b711689..a0a25ae1ed5c21075ed46209ac3d3c868076ccb2 100644 --- a/src/interdomain/service/__main__.py +++ b/src/interdomain/service/__main__.py @@ -18,8 +18,9 @@ from common.Constants import ServiceNameEnum from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, wait_for_environment_variables) -from interdomain.Config import is_topology_abstractor_enabled -from .topology_abstractor.TopologyAbstractor import TopologyAbstractor +from interdomain.Config import is_dlt_enabled +#from .topology_abstractor.TopologyAbstractor import TopologyAbstractor +from .topology_abstractor.DltRecorder import DLTRecorder from .InterdomainService import InterdomainService from .RemoteDomainClients import RemoteDomainClients @@ -64,17 +65,25 @@ def main(): grpc_service.start() # Subscribe to Context Events - topology_abstractor_enabled = is_topology_abstractor_enabled() - if topology_abstractor_enabled: - topology_abstractor = TopologyAbstractor() - topology_abstractor.start() + # topology_abstractor_enabled = is_topology_abstractor_enabled() + # if topology_abstractor_enabled: + # topology_abstractor = TopologyAbstractor() + # topology_abstractor.start() + + # Subscribe to Context Events + dlt_enabled = is_dlt_enabled() + if dlt_enabled: + dlt_recorder = DLTRecorder() + dlt_recorder.start() # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=1.0): pass LOGGER.info('Terminating...') - if topology_abstractor_enabled: - topology_abstractor.stop() + # if topology_abstractor_enabled: + # topology_abstractor.stop() + if dlt_enabled: + dlt_recorder.stop() grpc_service.stop() remote_domain_clients.stop() diff --git a/src/interdomain/service/topology_abstractor/DltRecorder.py b/src/interdomain/service/topology_abstractor/DltRecorder.py new file mode 100644 index 0000000000000000000000000000000000000000..cb8c194eb0f100b5fe11c523a4618cf41891213b --- /dev/null +++ b/src/interdomain/service/topology_abstractor/DltRecorder.py @@ -0,0 +1,133 @@ +import logging +import threading +from typing import Dict, Optional + +from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum +from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_environment_variables, get_env_var_name +from common.proto.context_pb2 import ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPointId, Link, LinkEvent, TopologyId, TopologyEvent +from common.tools.context_queries.Context import create_context +from common.tools.context_queries.Device import get_uuids_of_devices_in_topology +from common.tools.context_queries.Topology import create_missing_topologies +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.tools.object_factory.Context import json_context_id +from common.tools.object_factory.Device import json_device_id +from common.tools.object_factory.Topology import json_topology_id +from context.client.ContextClient import ContextClient +from context.client.EventsCollector import EventsCollector +from dlt.connector.client.DltConnectorClient import DltConnectorClient +from .DltRecordSender import DltRecordSender +from .Types import EventTypes + +LOGGER = logging.getLogger(__name__) + +ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) +INTERDOMAIN_TOPOLOGY_ID = TopologyId(**json_topology_id(INTERDOMAIN_TOPOLOGY_NAME, context_id=ADMIN_CONTEXT_ID)) + + +class DLTRecorder(threading.Thread): + def __init__(self) -> None: + super().__init__(daemon=True) + self.terminate = threading.Event() + self.context_client = ContextClient() + self.context_event_collector = EventsCollector(self.context_client) + + def stop(self): + self.terminate.set() + + def run(self) -> None: + self.context_client.connect() + create_context(self.context_client, DEFAULT_CONTEXT_NAME) + self.create_topologies() + self.context_event_collector.start() + + while not self.terminate.is_set(): + event = self.context_event_collector.get_event(timeout=0.1) + if event is None: + continue + LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event))) + self.update_record(event) + + self.context_event_collector.stop() + self.context_client.close() + + def create_topologies(self): + topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME] + create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids) + + def get_dlt_connector_client(self) -> Optional[DltConnectorClient]: + env_vars = find_environment_variables([ + get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST), + get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + ]) + if len(env_vars) == 2: + dlt_connector_client = DltConnectorClient() + dlt_connector_client.connect() + return dlt_connector_client + return None + + def update_record(self, event: EventTypes) -> None: + dlt_connector_client = self.get_dlt_connector_client() + dlt_record_sender = DltRecordSender(self.context_client, dlt_connector_client) + + if isinstance(event, ContextEvent): + LOGGER.debug('Processing ContextEvent({:s})'.format(grpc_message_to_json_string(event))) + LOGGER.warning('Ignoring ContextEvent({:s})'.format(grpc_message_to_json_string(event))) + + elif isinstance(event, TopologyEvent): + LOGGER.debug('Processing TopologyEvent({:s})'.format(grpc_message_to_json_string(event))) + self.process_topology_event(event, dlt_record_sender) + + elif isinstance(event, DeviceEvent): + LOGGER.debug('Processing DeviceEvent({:s})'.format(grpc_message_to_json_string(event))) + self.process_device_event(event, dlt_record_sender) + + elif isinstance(event, LinkEvent): + LOGGER.debug('Processing LinkEvent({:s})'.format(grpc_message_to_json_string(event))) + self.process_link_event(event, dlt_record_sender) + + else: + LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event))) + + dlt_record_sender.commit() + if dlt_connector_client is not None: + dlt_connector_client.close() + + def process_topology_event(self, event: TopologyEvent, dlt_record_sender: DltRecordSender) -> None: + topology_id = event.topology_id + topology_uuid = topology_id.topology_uuid.uuid + context_id = topology_id.context_id + context_uuid = context_id.context_uuid.uuid + topology_uuids = {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME} + + context = self.context_client.GetContext(context_id) + context_name = context.name + + topology_details = self.context_client.GetTopologyDetails(topology_id) + topology_name = topology_details.name + + if ((context_uuid == DEFAULT_CONTEXT_NAME) or (context_name == DEFAULT_CONTEXT_NAME)) and \ + (topology_uuid not in topology_uuids) and (topology_name not in topology_uuids): + for device in topology_details.devices: + dlt_record_sender.add_device(topology_id, device) + + for link in topology_details.links: + dlt_record_sender.add_link(topology_id, link) + + else: + MSG = 'Ignoring ({:s}/{:s})({:s}/{:s}) TopologyEvent({:s})' + args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event) + LOGGER.warning(MSG.format(*args)) + + +#Check which ID to use. + + def process_device_event(self, event: DeviceEvent, dlt_record_sender: DltRecordSender) -> None: + device_id = event.device_id + device_uuid = device_id.device_uuid.uuid + device = self.context_client.GetDevice(device_id) + dlt_record_sender.add_device(device_id.context_id, device) + + def process_link_event(self, event: LinkEvent, dlt_record_sender: DltRecordSender) -> None: + link_id = event.link_id + link = self.context_client.GetLink(link_id) + dlt_record_sender.add_link(link_id.context_id, link)