Skip to content
DltRecorder.py 7.7 KiB
Newer Older
import logging
import threading
from typing import Dict, Optional

from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum
from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_environment_variables, get_env_var_name
Javier Diaz's avatar
Javier Diaz committed
from common.proto.context_pb2 import ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPointId, Link, LinkId, LinkEvent, TopologyId, TopologyEvent
from common.tools.context_queries.Context import create_context
from common.tools.context_queries.Device import get_uuids_of_devices_in_topology
from common.tools.context_queries.Topology import create_missing_topologies
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Device import json_device_id
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
from dlt.connector.client.DltConnectorClient import DltConnectorClient
from .DltRecordSender import DltRecordSender
from .Types import EventTypes

LOGGER = logging.getLogger(__name__)
Javier Diaz's avatar
Javier Diaz committed
LOGGER.setLevel(logging.DEBUG)

ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME))
INTERDOMAIN_TOPOLOGY_ID = TopologyId(**json_topology_id(INTERDOMAIN_TOPOLOGY_NAME, context_id=ADMIN_CONTEXT_ID))


class DLTRecorder(threading.Thread):
    def __init__(self) -> None:
        super().__init__(daemon=True)
        self.terminate = threading.Event()
        self.context_client = ContextClient()
        self.context_event_collector = EventsCollector(self.context_client)
Javier Diaz's avatar
Javier Diaz committed
        self.topology_cache: Dict[str, TopologyId] = {}

    def stop(self):
        self.terminate.set()

    def run(self) -> None:
        self.context_client.connect()
        create_context(self.context_client, DEFAULT_CONTEXT_NAME)
        self.create_topologies()
        self.context_event_collector.start()

        while not self.terminate.is_set():
            event = self.context_event_collector.get_event(timeout=0.1)
            if event is None:
                continue
            LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event)))
            self.update_record(event)

        self.context_event_collector.stop()
        self.context_client.close()

    def create_topologies(self):
        topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME]
        create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids)

    def get_dlt_connector_client(self) -> Optional[DltConnectorClient]:
Javier Diaz's avatar
Javier Diaz committed
        # Always enable DLT for testing
        dlt_connector_client = DltConnectorClient()
        dlt_connector_client.connect()
        return dlt_connector_client
        # env_vars = find_environment_variables([
        #     get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST),
        #     get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
        # ])
        # if len(env_vars) == 2:
        #     dlt_connector_client = DltConnectorClient()
        #     dlt_connector_client.connect()
        #     return dlt_connector_client
        # return None

    def update_record(self, event: EventTypes) -> None:
        dlt_connector_client = self.get_dlt_connector_client()
        dlt_record_sender = DltRecordSender(self.context_client, dlt_connector_client)

        if isinstance(event, ContextEvent):
            LOGGER.debug('Processing ContextEvent({:s})'.format(grpc_message_to_json_string(event)))
            LOGGER.warning('Ignoring ContextEvent({:s})'.format(grpc_message_to_json_string(event)))

        elif isinstance(event, TopologyEvent):
            LOGGER.debug('Processing TopologyEvent({:s})'.format(grpc_message_to_json_string(event)))
            self.process_topology_event(event, dlt_record_sender)

        elif isinstance(event, DeviceEvent):
            LOGGER.debug('Processing DeviceEvent({:s})'.format(grpc_message_to_json_string(event)))
            self.process_device_event(event, dlt_record_sender)

        elif isinstance(event, LinkEvent):
            LOGGER.debug('Processing LinkEvent({:s})'.format(grpc_message_to_json_string(event)))
            self.process_link_event(event, dlt_record_sender)

        else:
            LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event)))

        dlt_record_sender.commit()
        if dlt_connector_client is not None:
            dlt_connector_client.close()

    def process_topology_event(self, event: TopologyEvent, dlt_record_sender: DltRecordSender) -> None:
        topology_id = event.topology_id
        topology_uuid = topology_id.topology_uuid.uuid
        context_id = topology_id.context_id
        context_uuid = context_id.context_uuid.uuid
        topology_uuids = {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME}

        context = self.context_client.GetContext(context_id)
        context_name = context.name

        topology_details = self.context_client.GetTopologyDetails(topology_id)
        topology_name = topology_details.name

Javier Diaz's avatar
Javier Diaz committed
        self.topology_cache[topology_uuid] = topology_id
Javier Diaz's avatar
Javier Diaz committed

Javier Diaz's avatar
Javier Diaz committed
        LOGGER.debug('TOPOLOGY Details({:s})'.format(grpc_message_to_json_string(topology_details)))

        if ((context_uuid == DEFAULT_CONTEXT_NAME) or (context_name == DEFAULT_CONTEXT_NAME)) and \
                (topology_uuid not in topology_uuids) and (topology_name not in topology_uuids):
            for device in topology_details.devices:
Javier Diaz's avatar
Javier Diaz committed
                LOGGER.debug('DEVICE INFO({:s})'.format(grpc_message_to_json_string(device)))
                dlt_record_sender.add_device(topology_id, device)

            for link in topology_details.links:
                dlt_record_sender.add_link(topology_id, link)

        else:
            MSG = 'Ignoring ({:s}/{:s})({:s}/{:s}) TopologyEvent({:s})'
            args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event)
            LOGGER.warning(MSG.format(*args))

Javier Diaz's avatar
Javier Diaz committed
    def find_topology_for_device(self, device_id: DeviceId) -> Optional[TopologyId]:
Javier Diaz's avatar
Javier Diaz committed
        for topology_uuid, topology_id in self.topology_cache.items():
            details = self.context_client.GetTopologyDetails(topology_id)
Javier Diaz's avatar
Javier Diaz committed
            for device in details.devices:
                if device.device_id == device_id:
                    return topology_id
        return None
Javier Diaz's avatar
Javier Diaz committed
    def find_topology_for_link(self, link_id: LinkId) -> Optional[TopologyId]:
Javier Diaz's avatar
Javier Diaz committed
        for topology_uuid, topology_id in self.topology_cache.items():
            details = self.context_client.GetTopologyDetails(topology_id)
Javier Diaz's avatar
Javier Diaz committed
            for link in details.links:
                if link.link_id == link_id:
                    return topology_id
        return None

    def process_device_event(self, event: DeviceEvent, dlt_record_sender: DltRecordSender) -> None:
        device_id = event.device_id
        device = self.context_client.GetDevice(device_id)
Javier Diaz's avatar
Javier Diaz committed
        topology_id = self.find_topology_for_device(device_id)
        if topology_id:
Javier Diaz's avatar
Javier Diaz committed
            LOGGER.debug('DEVICE_INFO({:s}), DEVICE_ID ({:s})'.format(grpc_message_to_json_string(device), str(device_id)))

Javier Diaz's avatar
Javier Diaz committed
            dlt_record_sender.add_device(topology_id, device)
        else:
            LOGGER.warning(f"Topology not found for device {device_id.device_uuid.uuid}")

    def process_link_event(self, event: LinkEvent, dlt_record_sender: DltRecordSender) -> None:
        link_id = event.link_id
        link = self.context_client.GetLink(link_id)
Javier Diaz's avatar
Javier Diaz committed
        topology_id = self.find_topology_for_link(link_id)
        if topology_id:
            dlt_record_sender.add_link(topology_id, link)
        else:
            LOGGER.warning(f"Topology not found for link {link_id.link_uuid.uuid}")