diff --git a/src/dlt/connector/service/tools/Checkers.py b/src/dlt/connector/service/tools/Checkers.py index 6ad0f4b82626740c594829831b08fcefbc15096d..94a10d4096b4acb1756ad1c7297faa9c96e07880 100644 --- a/src/dlt/connector/service/tools/Checkers.py +++ b/src/dlt/connector/service/tools/Checkers.py @@ -20,5 +20,5 @@ def record_exists(record : DltRecord) -> bool: exists = exists and (record.record_id.type != DLTRECORDTYPE_UNDEFINED) exists = exists and (len(record.record_id.record_uuid.uuid) > 0) #exists = exists and (record.operation != DLTRECORDOPERATION_UNDEFINED) - exists = exists and (len(record.data_json) > 0) + #exists = exists and (len(record.data_json) > 0) return exists diff --git a/src/interdomain/service/topology_abstractor/DltRecorderOld.py b/src/interdomain/service/topology_abstractor/DltRecorderOld.py deleted file mode 100644 index cb8c194eb0f100b5fe11c523a4618cf41891213b..0000000000000000000000000000000000000000 --- a/src/interdomain/service/topology_abstractor/DltRecorderOld.py +++ /dev/null @@ -1,133 +0,0 @@ -import logging -import threading -from typing import Dict, Optional - -from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum -from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_environment_variables, get_env_var_name -from common.proto.context_pb2 import ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPointId, Link, LinkEvent, TopologyId, TopologyEvent -from common.tools.context_queries.Context import create_context -from common.tools.context_queries.Device import get_uuids_of_devices_in_topology -from common.tools.context_queries.Topology import create_missing_topologies -from common.tools.grpc.Tools import grpc_message_to_json_string -from common.tools.object_factory.Context import json_context_id -from common.tools.object_factory.Device import json_device_id -from common.tools.object_factory.Topology import json_topology_id -from context.client.ContextClient import ContextClient -from context.client.EventsCollector import EventsCollector -from dlt.connector.client.DltConnectorClient import DltConnectorClient -from .DltRecordSender import DltRecordSender -from .Types import EventTypes - -LOGGER = logging.getLogger(__name__) - -ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) -INTERDOMAIN_TOPOLOGY_ID = TopologyId(**json_topology_id(INTERDOMAIN_TOPOLOGY_NAME, context_id=ADMIN_CONTEXT_ID)) - - -class DLTRecorder(threading.Thread): - def __init__(self) -> None: - super().__init__(daemon=True) - self.terminate = threading.Event() - self.context_client = ContextClient() - self.context_event_collector = EventsCollector(self.context_client) - - def stop(self): - self.terminate.set() - - def run(self) -> None: - self.context_client.connect() - create_context(self.context_client, DEFAULT_CONTEXT_NAME) - self.create_topologies() - self.context_event_collector.start() - - while not self.terminate.is_set(): - event = self.context_event_collector.get_event(timeout=0.1) - if event is None: - continue - LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event))) - self.update_record(event) - - self.context_event_collector.stop() - self.context_client.close() - - def create_topologies(self): - topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME] - create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids) - - def get_dlt_connector_client(self) -> Optional[DltConnectorClient]: - env_vars = find_environment_variables([ - get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST), - get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), - ]) - if len(env_vars) == 2: - dlt_connector_client = DltConnectorClient() - dlt_connector_client.connect() - return dlt_connector_client - return None - - def update_record(self, event: EventTypes) -> None: - dlt_connector_client = self.get_dlt_connector_client() - dlt_record_sender = DltRecordSender(self.context_client, dlt_connector_client) - - if isinstance(event, ContextEvent): - LOGGER.debug('Processing ContextEvent({:s})'.format(grpc_message_to_json_string(event))) - LOGGER.warning('Ignoring ContextEvent({:s})'.format(grpc_message_to_json_string(event))) - - elif isinstance(event, TopologyEvent): - LOGGER.debug('Processing TopologyEvent({:s})'.format(grpc_message_to_json_string(event))) - self.process_topology_event(event, dlt_record_sender) - - elif isinstance(event, DeviceEvent): - LOGGER.debug('Processing DeviceEvent({:s})'.format(grpc_message_to_json_string(event))) - self.process_device_event(event, dlt_record_sender) - - elif isinstance(event, LinkEvent): - LOGGER.debug('Processing LinkEvent({:s})'.format(grpc_message_to_json_string(event))) - self.process_link_event(event, dlt_record_sender) - - else: - LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event))) - - dlt_record_sender.commit() - if dlt_connector_client is not None: - dlt_connector_client.close() - - def process_topology_event(self, event: TopologyEvent, dlt_record_sender: DltRecordSender) -> None: - topology_id = event.topology_id - topology_uuid = topology_id.topology_uuid.uuid - context_id = topology_id.context_id - context_uuid = context_id.context_uuid.uuid - topology_uuids = {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME} - - context = self.context_client.GetContext(context_id) - context_name = context.name - - topology_details = self.context_client.GetTopologyDetails(topology_id) - topology_name = topology_details.name - - if ((context_uuid == DEFAULT_CONTEXT_NAME) or (context_name == DEFAULT_CONTEXT_NAME)) and \ - (topology_uuid not in topology_uuids) and (topology_name not in topology_uuids): - for device in topology_details.devices: - dlt_record_sender.add_device(topology_id, device) - - for link in topology_details.links: - dlt_record_sender.add_link(topology_id, link) - - else: - MSG = 'Ignoring ({:s}/{:s})({:s}/{:s}) TopologyEvent({:s})' - args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event) - LOGGER.warning(MSG.format(*args)) - - -#Check which ID to use. - - def process_device_event(self, event: DeviceEvent, dlt_record_sender: DltRecordSender) -> None: - device_id = event.device_id - device_uuid = device_id.device_uuid.uuid - device = self.context_client.GetDevice(device_id) - dlt_record_sender.add_device(device_id.context_id, device) - - def process_link_event(self, event: LinkEvent, dlt_record_sender: DltRecordSender) -> None: - link_id = event.link_id - link = self.context_client.GetLink(link_id) - dlt_record_sender.add_link(link_id.context_id, link)