Newer
Older
import logging
import threading
from typing import Dict, Optional
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum
from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_environment_variables, get_env_var_name
from common.proto.context_pb2 import ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPointId, Link, LinkId, LinkEvent, TopologyId, TopologyEvent
from common.tools.context_queries.Context import create_context
from common.tools.context_queries.Device import get_uuids_of_devices_in_topology
from common.tools.context_queries.Topology import create_missing_topologies
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Device import json_device_id
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
from dlt.connector.client.DltConnectorClient import DltConnectorClient
from .DltRecordSender import DltRecordSender
from .Types import EventTypes
LOGGER = logging.getLogger(__name__)
ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME))
INTERDOMAIN_TOPOLOGY_ID = TopologyId(**json_topology_id(INTERDOMAIN_TOPOLOGY_NAME, context_id=ADMIN_CONTEXT_ID))
class DLTRecorder(threading.Thread):
def __init__(self) -> None:
super().__init__(daemon=True)
self.terminate = threading.Event()
self.context_client = ContextClient()
self.context_event_collector = EventsCollector(self.context_client)
def stop(self):
self.terminate.set()
def run(self) -> None:
self.context_client.connect()
create_context(self.context_client, DEFAULT_CONTEXT_NAME)
self.create_topologies()
self.context_event_collector.start()
while not self.terminate.is_set():
event = self.context_event_collector.get_event(timeout=0.1)
if event is None:
continue
LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event)))
self.update_record(event)
self.context_event_collector.stop()
self.context_client.close()
def create_topologies(self):
topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME]
create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids)
def get_dlt_connector_client(self) -> Optional[DltConnectorClient]:
# Always enable DLT for testing
dlt_connector_client = DltConnectorClient()
dlt_connector_client.connect()
return dlt_connector_client
# env_vars = find_environment_variables([
# get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST),
# get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
# ])
# if len(env_vars) == 2:
# dlt_connector_client = DltConnectorClient()
# dlt_connector_client.connect()
# return dlt_connector_client
# return None
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def update_record(self, event: EventTypes) -> None:
dlt_connector_client = self.get_dlt_connector_client()
dlt_record_sender = DltRecordSender(self.context_client, dlt_connector_client)
if isinstance(event, ContextEvent):
LOGGER.debug('Processing ContextEvent({:s})'.format(grpc_message_to_json_string(event)))
LOGGER.warning('Ignoring ContextEvent({:s})'.format(grpc_message_to_json_string(event)))
elif isinstance(event, TopologyEvent):
LOGGER.debug('Processing TopologyEvent({:s})'.format(grpc_message_to_json_string(event)))
self.process_topology_event(event, dlt_record_sender)
elif isinstance(event, DeviceEvent):
LOGGER.debug('Processing DeviceEvent({:s})'.format(grpc_message_to_json_string(event)))
self.process_device_event(event, dlt_record_sender)
elif isinstance(event, LinkEvent):
LOGGER.debug('Processing LinkEvent({:s})'.format(grpc_message_to_json_string(event)))
self.process_link_event(event, dlt_record_sender)
else:
LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event)))
dlt_record_sender.commit()
if dlt_connector_client is not None:
dlt_connector_client.close()
def process_topology_event(self, event: TopologyEvent, dlt_record_sender: DltRecordSender) -> None:
topology_id = event.topology_id
topology_uuid = topology_id.topology_uuid.uuid
context_id = topology_id.context_id
context_uuid = context_id.context_uuid.uuid
topology_uuids = {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME}
context = self.context_client.GetContext(context_id)
context_name = context.name
topology_details = self.context_client.GetTopologyDetails(topology_id)
topology_name = topology_details.name
LOGGER.debug('TOPOLOGY Details({:s})'.format(grpc_message_to_json_string(topology_details)))
if ((context_uuid == DEFAULT_CONTEXT_NAME) or (context_name == DEFAULT_CONTEXT_NAME)) and \
(topology_uuid not in topology_uuids) and (topology_name not in topology_uuids):
for device in topology_details.devices:
LOGGER.debug('DEVICE INFO({:s})'.format(grpc_message_to_json_string(device)))
dlt_record_sender.add_device(topology_id, device)
for link in topology_details.links:
dlt_record_sender.add_link(topology_id, link)
else:
MSG = 'Ignoring ({:s}/{:s})({:s}/{:s}) TopologyEvent({:s})'
args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event)
LOGGER.warning(MSG.format(*args))
def find_topology_for_device(self, device_id: DeviceId) -> Optional[TopologyId]:
for topology_uuid, topology_id in self.topology_cache.items():
details = self.context_client.GetTopologyDetails(topology_id)
for device in details.devices:
if device.device_id == device_id:
return topology_id
return None
def find_topology_for_link(self, link_id: LinkId) -> Optional[TopologyId]:
for topology_uuid, topology_id in self.topology_cache.items():
details = self.context_client.GetTopologyDetails(topology_id)
for link in details.links:
if link.link_id == link_id:
return topology_id
return None
def process_device_event(self, event: DeviceEvent, dlt_record_sender: DltRecordSender) -> None:
device_id = event.device_id
device = self.context_client.GetDevice(device_id)
topology_id = self.find_topology_for_device(device_id)
if topology_id:
LOGGER.debug('DEVICE_INFO({:s}), DEVICE_ID ({:s})'.format(grpc_message_to_json_string(device), str(device_id)))
dlt_record_sender.add_device(topology_id, device)
else:
LOGGER.warning(f"Topology not found for device {device_id.device_uuid.uuid}")
def process_link_event(self, event: LinkEvent, dlt_record_sender: DltRecordSender) -> None:
link_id = event.link_id
link = self.context_client.GetLink(link_id)