Skip to content
Snippets Groups Projects
Commit 2f938758 authored by Javier Diaz's avatar Javier Diaz
Browse files

debugging

parent ebb001db
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!259Resolve "(CTTC) Replace DLT Gateway functionality with an opensource and Hyper Ledger v2.4+ compliant version"
...@@ -13,6 +13,8 @@ ...@@ -13,6 +13,8 @@
# limitations under the License. # limitations under the License.
import logging, signal, sys, threading import logging, signal, sys, threading
import asyncio
from prometheus_client import start_http_server from prometheus_client import start_http_server
from common.Constants import ServiceNameEnum from common.Constants import ServiceNameEnum
from common.Settings import ( from common.Settings import (
...@@ -27,6 +29,12 @@ from .RemoteDomainClients import RemoteDomainClients ...@@ -27,6 +29,12 @@ from .RemoteDomainClients import RemoteDomainClients
terminate = threading.Event() terminate = threading.Event()
LOGGER : logging.Logger = None LOGGER : logging.Logger = None
async def run_dlt_recorder(dlt_recorder):
await dlt_recorder.start()
await terminate.wait()
await dlt_recorder.stop()
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received') LOGGER.warning('Terminate signal received')
terminate.set() terminate.set()
...@@ -76,7 +84,9 @@ def main(): ...@@ -76,7 +84,9 @@ def main():
if dlt_enabled: if dlt_enabled:
LOGGER.info('Starting DLT functionality...') LOGGER.info('Starting DLT functionality...')
dlt_recorder = DLTRecorder() dlt_recorder = DLTRecorder()
dlt_recorder.start() #dlt_recorder.start()
loop = asyncio.get_event_loop()
loop.run_until_complete(run_dlt_recorder(dlt_recorder))
# Wait for Ctrl+C or termination signal # Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass while not terminate.wait(timeout=1.0): pass
...@@ -85,7 +95,9 @@ def main(): ...@@ -85,7 +95,9 @@ def main():
# if topology_abstractor_enabled: # if topology_abstractor_enabled:
# topology_abstractor.stop() # topology_abstractor.stop()
if dlt_enabled: if dlt_enabled:
dlt_recorder.stop() #dlt_recorder.stop()
loop.run_until_complete(dlt_recorder.stop())
loop.close()
grpc_service.stop() grpc_service.stop()
remote_domain_clients.stop() remote_domain_clients.stop()
......
import logging
import threading
from typing import Dict, Optional
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum
from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_environment_variables, get_env_var_name
from common.proto.context_pb2 import ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPointId, Link, LinkId, LinkEvent, TopologyId, TopologyEvent
from common.tools.context_queries.Context import create_context
from common.tools.context_queries.Device import get_uuids_of_devices_in_topology
from common.tools.context_queries.Topology import create_missing_topologies
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Device import json_device_id
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
from dlt.connector.client.DltConnectorClient import DltConnectorClient
from .DltRecordSender import DltRecordSender
from .Types import EventTypes
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME))
INTERDOMAIN_TOPOLOGY_ID = TopologyId(**json_topology_id(INTERDOMAIN_TOPOLOGY_NAME, context_id=ADMIN_CONTEXT_ID))
class DLTRecorder(threading.Thread):
def __init__(self) -> None:
super().__init__(daemon=True)
self.terminate = threading.Event()
self.context_client = ContextClient()
self.context_event_collector = EventsCollector(self.context_client)
self.topology_cache: Dict[str, TopologyId] = {}
def stop(self):
self.terminate.set()
def run(self) -> None:
self.context_client.connect()
create_context(self.context_client, DEFAULT_CONTEXT_NAME)
self.create_topologies()
self.context_event_collector.start()
while not self.terminate.is_set():
event = self.context_event_collector.get_event(timeout=0.1)
if event is None:
continue
LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event)))
self.update_record(event)
self.context_event_collector.stop()
self.context_client.close()
def create_topologies(self):
topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME]
create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids)
def get_dlt_connector_client(self) -> Optional[DltConnectorClient]:
# Always enable DLT for testing
dlt_connector_client = DltConnectorClient()
dlt_connector_client.connect()
return dlt_connector_client
# env_vars = find_environment_variables([
# get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST),
# get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
# ])
# if len(env_vars) == 2:
# dlt_connector_client = DltConnectorClient()
# dlt_connector_client.connect()
# return dlt_connector_client
# return None
def update_record(self, event: EventTypes) -> None:
dlt_connector_client = self.get_dlt_connector_client()
dlt_record_sender = DltRecordSender(self.context_client, dlt_connector_client)
if isinstance(event, ContextEvent):
LOGGER.debug('Processing ContextEvent({:s})'.format(grpc_message_to_json_string(event)))
LOGGER.warning('Ignoring ContextEvent({:s})'.format(grpc_message_to_json_string(event)))
elif isinstance(event, TopologyEvent):
LOGGER.debug('Processing TopologyEvent({:s})'.format(grpc_message_to_json_string(event)))
self.process_topology_event(event, dlt_record_sender)
elif isinstance(event, DeviceEvent):
LOGGER.debug('Processing DeviceEvent({:s})'.format(grpc_message_to_json_string(event)))
self.process_device_event(event, dlt_record_sender)
elif isinstance(event, LinkEvent):
LOGGER.debug('Processing LinkEvent({:s})'.format(grpc_message_to_json_string(event)))
self.process_link_event(event, dlt_record_sender)
else:
LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event)))
dlt_record_sender.commit()
if dlt_connector_client is not None:
dlt_connector_client.close()
def process_topology_event(self, event: TopologyEvent, dlt_record_sender: DltRecordSender) -> None:
topology_id = event.topology_id
topology_uuid = topology_id.topology_uuid.uuid
context_id = topology_id.context_id
context_uuid = context_id.context_uuid.uuid
topology_uuids = {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME}
context = self.context_client.GetContext(context_id)
context_name = context.name
topology_details = self.context_client.GetTopologyDetails(topology_id)
topology_name = topology_details.name
self.topology_cache[topology_uuid] = topology_id
LOGGER.debug('TOPOLOGY Details({:s})'.format(grpc_message_to_json_string(topology_details)))
if ((context_uuid == DEFAULT_CONTEXT_NAME) or (context_name == DEFAULT_CONTEXT_NAME)) and \
(topology_uuid not in topology_uuids) and (topology_name not in topology_uuids):
for device in topology_details.devices:
LOGGER.debug('DEVICE_INFO_TOPO({:s})'.format(grpc_message_to_json_string(device)))
dlt_record_sender.add_device(topology_id, device)
for link in topology_details.links:
dlt_record_sender.add_link(topology_id, link)
else:
MSG = 'Ignoring ({:s}/{:s})({:s}/{:s}) TopologyEvent({:s})'
args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event)
LOGGER.warning(MSG.format(*args))
def find_topology_for_device(self, device_id: DeviceId) -> Optional[TopologyId]:
for topology_uuid, topology_id in self.topology_cache.items():
details = self.context_client.GetTopologyDetails(topology_id)
for device in details.devices:
if device.device_id == device_id:
return topology_id
return None
def find_topology_for_link(self, link_id: LinkId) -> Optional[TopologyId]:
for topology_uuid, topology_id in self.topology_cache.items():
details = self.context_client.GetTopologyDetails(topology_id)
for link in details.links:
if link.link_id == link_id:
return topology_id
return None
def process_device_event(self, event: DeviceEvent, dlt_record_sender: DltRecordSender) -> None:
device_id = event.device_id
device = self.context_client.GetDevice(device_id)
topology_id = self.find_topology_for_device(device_id)
if topology_id:
LOGGER.debug('DEVICE_INFO({:s}), DEVICE_ID ({:s})'.format(str(device.device_id.device_uuid.uuid), grpc_message_to_json_string(device_id)))
dlt_record_sender.add_device(topology_id, device)
else:
LOGGER.warning(f"Topology not found for device {device_id.device_uuid.uuid}")
def process_link_event(self, event: LinkEvent, dlt_record_sender: DltRecordSender) -> None:
link_id = event.link_id
link = self.context_client.GetLink(link_id)
topology_id = self.find_topology_for_link(link_id)
if topology_id:
dlt_record_sender.add_link(topology_id, link)
else:
LOGGER.warning(f"Topology not found for link {link_id.link_uuid.uuid}")
# DltRecorder.py
import logging import logging
import threading import asyncio
from typing import Dict, Optional from typing import Dict, Optional
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum
from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_environment_variables, get_env_var_name from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_environment_variables, get_env_var_name
from common.proto.context_pb2 import ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPointId, Link, LinkId, LinkEvent, TopologyId, TopologyEvent from common.proto.context_pb2 import ContextEvent, ContextId, Device, DeviceEvent, EndPointId, Link, LinkId, LinkEvent, TopologyId, TopologyEvent
from common.tools.context_queries.Context import create_context from common.tools.context_queries.Context import create_context
from common.tools.context_queries.Device import get_uuids_of_devices_in_topology from common.tools.context_queries.Device import get_uuids_of_devices_in_topology
from common.tools.context_queries.Topology import create_missing_topologies from common.tools.context_queries.Topology import create_missing_topologies
...@@ -25,54 +27,47 @@ ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) ...@@ -25,54 +27,47 @@ ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME))
INTERDOMAIN_TOPOLOGY_ID = TopologyId(**json_topology_id(INTERDOMAIN_TOPOLOGY_NAME, context_id=ADMIN_CONTEXT_ID)) INTERDOMAIN_TOPOLOGY_ID = TopologyId(**json_topology_id(INTERDOMAIN_TOPOLOGY_NAME, context_id=ADMIN_CONTEXT_ID))
class DLTRecorder(threading.Thread): class DLTRecorder:
def __init__(self) -> None: def __init__(self) -> None:
super().__init__(daemon=True) self.terminate_event = asyncio.Event()
self.terminate = threading.Event()
self.context_client = ContextClient() self.context_client = ContextClient()
self.context_event_collector = EventsCollector(self.context_client) self.context_event_collector = EventsCollector(self.context_client)
self.topology_cache: Dict[str, TopologyId] = {} self.topology_cache: Dict[str, TopologyId] = {}
def stop(self): async def stop(self):
self.terminate.set() self.terminate_event.set()
async def start(self) -> None:
await self.run()
def run(self) -> None: async def run(self) -> None:
self.context_client.connect() await self.context_client.connect()
create_context(self.context_client, DEFAULT_CONTEXT_NAME) create_context(self.context_client, DEFAULT_CONTEXT_NAME)
self.create_topologies() self.create_topologies()
self.context_event_collector.start() await self.context_event_collector.start()
while not self.terminate.is_set(): while not self.terminate_event.is_set():
event = self.context_event_collector.get_event(timeout=0.1) event = await self.context_event_collector.get_event(timeout=0.1)
if event is None: if event is None:
continue continue
LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event))) LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event)))
self.update_record(event) await self.update_record(event)
self.context_event_collector.stop() await self.context_event_collector.stop()
self.context_client.close() await self.context_client.close()
def create_topologies(self): def create_topologies(self):
topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME] topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME]
create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids) create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids)
def get_dlt_connector_client(self) -> Optional[DltConnectorClient]: async def get_dlt_connector_client(self) -> Optional[DltConnectorClient]:
# Always enable DLT for testing # Always enable DLT for testing
dlt_connector_client = DltConnectorClient() dlt_connector_client = DltConnectorClient()
dlt_connector_client.connect() await dlt_connector_client.connect()
return dlt_connector_client return dlt_connector_client
# env_vars = find_environment_variables([
# get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST), async def update_record(self, event: EventTypes) -> None:
# get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), dlt_connector_client = await self.get_dlt_connector_client()
# ])
# if len(env_vars) == 2:
# dlt_connector_client = DltConnectorClient()
# dlt_connector_client.connect()
# return dlt_connector_client
# return None
def update_record(self, event: EventTypes) -> None:
dlt_connector_client = self.get_dlt_connector_client()
dlt_record_sender = DltRecordSender(self.context_client, dlt_connector_client) dlt_record_sender = DltRecordSender(self.context_client, dlt_connector_client)
if isinstance(event, ContextEvent): if isinstance(event, ContextEvent):
...@@ -81,34 +76,34 @@ class DLTRecorder(threading.Thread): ...@@ -81,34 +76,34 @@ class DLTRecorder(threading.Thread):
elif isinstance(event, TopologyEvent): elif isinstance(event, TopologyEvent):
LOGGER.debug('Processing TopologyEvent({:s})'.format(grpc_message_to_json_string(event))) LOGGER.debug('Processing TopologyEvent({:s})'.format(grpc_message_to_json_string(event)))
self.process_topology_event(event, dlt_record_sender) await self.process_topology_event(event, dlt_record_sender)
elif isinstance(event, DeviceEvent): elif isinstance(event, DeviceEvent):
LOGGER.debug('Processing DeviceEvent({:s})'.format(grpc_message_to_json_string(event))) LOGGER.debug('Processing DeviceEvent({:s})'.format(grpc_message_to_json_string(event)))
self.process_device_event(event, dlt_record_sender) await self.process_device_event(event, dlt_record_sender)
elif isinstance(event, LinkEvent): elif isinstance(event, LinkEvent):
LOGGER.debug('Processing LinkEvent({:s})'.format(grpc_message_to_json_string(event))) LOGGER.debug('Processing LinkEvent({:s})'.format(grpc_message_to_json_string(event)))
self.process_link_event(event, dlt_record_sender) await self.process_link_event(event, dlt_record_sender)
else: else:
LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event))) LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event)))
dlt_record_sender.commit() await dlt_record_sender.commit()
if dlt_connector_client is not None: if dlt_connector_client is not None:
dlt_connector_client.close() await dlt_connector_client.close()
def process_topology_event(self, event: TopologyEvent, dlt_record_sender: DltRecordSender) -> None: async def process_topology_event(self, event: TopologyEvent, dlt_record_sender: DltRecordSender) -> None:
topology_id = event.topology_id topology_id = event.topology_id
topology_uuid = topology_id.topology_uuid.uuid topology_uuid = topology_id.topology_uuid.uuid
context_id = topology_id.context_id context_id = topology_id.context_id
context_uuid = context_id.context_uuid.uuid context_uuid = context_id.context_uuid.uuid
topology_uuids = {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME} topology_uuids = {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME}
context = self.context_client.GetContext(context_id) context = await self.context_client.GetContext(context_id)
context_name = context.name context_name = context.name
topology_details = self.context_client.GetTopologyDetails(topology_id) topology_details = await self.context_client.GetTopologyDetails(topology_id)
topology_name = topology_details.name topology_name = topology_details.name
self.topology_cache[topology_uuid] = topology_id self.topology_cache[topology_uuid] = topology_id
...@@ -129,26 +124,26 @@ class DLTRecorder(threading.Thread): ...@@ -129,26 +124,26 @@ class DLTRecorder(threading.Thread):
args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event) args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event)
LOGGER.warning(MSG.format(*args)) LOGGER.warning(MSG.format(*args))
def find_topology_for_device(self, device_id: DeviceId) -> Optional[TopologyId]: async def find_topology_for_device(self, device_id: DeviceId) -> Optional[TopologyId]:
for topology_uuid, topology_id in self.topology_cache.items(): for topology_uuid, topology_id in self.topology_cache.items():
details = self.context_client.GetTopologyDetails(topology_id) details = await self.context_client.GetTopologyDetails(topology_id)
for device in details.devices: for device in details.devices:
if device.device_id == device_id: if device.device_id == device_id:
return topology_id return topology_id
return None return None
def find_topology_for_link(self, link_id: LinkId) -> Optional[TopologyId]: async def find_topology_for_link(self, link_id: LinkId) -> Optional[TopologyId]:
for topology_uuid, topology_id in self.topology_cache.items(): for topology_uuid, topology_id in self.topology_cache.items():
details = self.context_client.GetTopologyDetails(topology_id) details = await self.context_client.GetTopologyDetails(topology_id)
for link in details.links: for link in details.links:
if link.link_id == link_id: if link.link_id == link_id:
return topology_id return topology_id
return None return None
def process_device_event(self, event: DeviceEvent, dlt_record_sender: DltRecordSender) -> None: async def process_device_event(self, event: DeviceEvent, dlt_record_sender: DltRecordSender) -> None:
device_id = event.device_id device_id = event.device_id
device = self.context_client.GetDevice(device_id) device = await self.context_client.GetDevice(device_id)
topology_id = self.find_topology_for_device(device_id) topology_id = await self.find_topology_for_device(device_id)
if topology_id: if topology_id:
LOGGER.debug('DEVICE_INFO({:s}), DEVICE_ID ({:s})'.format(str(device.device_id.device_uuid.uuid), grpc_message_to_json_string(device_id))) LOGGER.debug('DEVICE_INFO({:s}), DEVICE_ID ({:s})'.format(str(device.device_id.device_uuid.uuid), grpc_message_to_json_string(device_id)))
...@@ -156,10 +151,10 @@ class DLTRecorder(threading.Thread): ...@@ -156,10 +151,10 @@ class DLTRecorder(threading.Thread):
else: else:
LOGGER.warning(f"Topology not found for device {device_id.device_uuid.uuid}") LOGGER.warning(f"Topology not found for device {device_id.device_uuid.uuid}")
def process_link_event(self, event: LinkEvent, dlt_record_sender: DltRecordSender) -> None: async def process_link_event(self, event: LinkEvent, dlt_record_sender: DltRecordSender) -> None:
link_id = event.link_id link_id = event.link_id
link = self.context_client.GetLink(link_id) link = await self.context_client.GetLink(link_id)
topology_id = self.find_topology_for_link(link_id) topology_id = await self.find_topology_for_link(link_id)
if topology_id: if topology_id:
dlt_record_sender.add_link(topology_id, link) dlt_record_sender.add_link(topology_id, link)
else: else:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment