Skip to content
Snippets Groups Projects
Commit ed1af500 authored by Javier Diaz's avatar Javier Diaz
Browse files

Integrating DLT into Interdomain module

parent f4fb7d70
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!259Resolve "(CTTC) Replace DLT Gateway functionality with an opensource and Hyper Ledger v2.4+ compliant version"
......@@ -20,7 +20,7 @@
export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/"
# Set the list of components, separated by spaces, you want to build images for, and deploy.
export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_generator dlt"
export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_generator interdomain dlt"
# Uncomment to activate Monitoring
#export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring"
......
......@@ -15,6 +15,7 @@
from common.Settings import get_setting
SETTING_NAME_TOPOLOGY_ABSTRACTOR = 'TOPOLOGY_ABSTRACTOR'
SETTING_NAME_DLT_INTEGRATION = 'DLT_INTEGRATION'
TRUE_VALUES = {'Y', 'YES', 'TRUE', 'T', 'E', 'ENABLE', 'ENABLED'}
def is_topology_abstractor_enabled() -> bool:
......@@ -22,3 +23,9 @@ def is_topology_abstractor_enabled() -> bool:
if is_enabled is None: return False
str_is_enabled = str(is_enabled).upper()
return str_is_enabled in TRUE_VALUES
def is_dlt_enabled() -> bool:
is_enabled = get_setting(SETTING_NAME_DLT_INTEGRATION, default=None)
if is_enabled is None: return False
str_is_enabled = str(is_enabled).upper()
return str_is_enabled in TRUE_VALUES
......@@ -18,8 +18,9 @@ from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port,
wait_for_environment_variables)
from interdomain.Config import is_topology_abstractor_enabled
from .topology_abstractor.TopologyAbstractor import TopologyAbstractor
from interdomain.Config import is_dlt_enabled
#from .topology_abstractor.TopologyAbstractor import TopologyAbstractor
from .topology_abstractor.DltRecorder import DLTRecorder
from .InterdomainService import InterdomainService
from .RemoteDomainClients import RemoteDomainClients
......@@ -64,17 +65,25 @@ def main():
grpc_service.start()
# Subscribe to Context Events
topology_abstractor_enabled = is_topology_abstractor_enabled()
if topology_abstractor_enabled:
topology_abstractor = TopologyAbstractor()
topology_abstractor.start()
# topology_abstractor_enabled = is_topology_abstractor_enabled()
# if topology_abstractor_enabled:
# topology_abstractor = TopologyAbstractor()
# topology_abstractor.start()
# Subscribe to Context Events
dlt_enabled = is_dlt_enabled()
if dlt_enabled:
dlt_recorder = DLTRecorder()
dlt_recorder.start()
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass
LOGGER.info('Terminating...')
if topology_abstractor_enabled:
topology_abstractor.stop()
# if topology_abstractor_enabled:
# topology_abstractor.stop()
if dlt_enabled:
dlt_recorder.stop()
grpc_service.stop()
remote_domain_clients.stop()
......
import logging
import threading
from typing import Dict, Optional
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum
from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_environment_variables, get_env_var_name
from common.proto.context_pb2 import ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPointId, Link, LinkEvent, TopologyId, TopologyEvent
from common.tools.context_queries.Context import create_context
from common.tools.context_queries.Device import get_uuids_of_devices_in_topology
from common.tools.context_queries.Topology import create_missing_topologies
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Device import json_device_id
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
from dlt.connector.client.DltConnectorClient import DltConnectorClient
from .DltRecordSender import DltRecordSender
from .Types import EventTypes
LOGGER = logging.getLogger(__name__)
ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME))
INTERDOMAIN_TOPOLOGY_ID = TopologyId(**json_topology_id(INTERDOMAIN_TOPOLOGY_NAME, context_id=ADMIN_CONTEXT_ID))
class DLTRecorder(threading.Thread):
def __init__(self) -> None:
super().__init__(daemon=True)
self.terminate = threading.Event()
self.context_client = ContextClient()
self.context_event_collector = EventsCollector(self.context_client)
def stop(self):
self.terminate.set()
def run(self) -> None:
self.context_client.connect()
create_context(self.context_client, DEFAULT_CONTEXT_NAME)
self.create_topologies()
self.context_event_collector.start()
while not self.terminate.is_set():
event = self.context_event_collector.get_event(timeout=0.1)
if event is None:
continue
LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event)))
self.update_record(event)
self.context_event_collector.stop()
self.context_client.close()
def create_topologies(self):
topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME]
create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids)
def get_dlt_connector_client(self) -> Optional[DltConnectorClient]:
env_vars = find_environment_variables([
get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST),
get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
if len(env_vars) == 2:
dlt_connector_client = DltConnectorClient()
dlt_connector_client.connect()
return dlt_connector_client
return None
def update_record(self, event: EventTypes) -> None:
dlt_connector_client = self.get_dlt_connector_client()
dlt_record_sender = DltRecordSender(self.context_client, dlt_connector_client)
if isinstance(event, ContextEvent):
LOGGER.debug('Processing ContextEvent({:s})'.format(grpc_message_to_json_string(event)))
LOGGER.warning('Ignoring ContextEvent({:s})'.format(grpc_message_to_json_string(event)))
elif isinstance(event, TopologyEvent):
LOGGER.debug('Processing TopologyEvent({:s})'.format(grpc_message_to_json_string(event)))
self.process_topology_event(event, dlt_record_sender)
elif isinstance(event, DeviceEvent):
LOGGER.debug('Processing DeviceEvent({:s})'.format(grpc_message_to_json_string(event)))
self.process_device_event(event, dlt_record_sender)
elif isinstance(event, LinkEvent):
LOGGER.debug('Processing LinkEvent({:s})'.format(grpc_message_to_json_string(event)))
self.process_link_event(event, dlt_record_sender)
else:
LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event)))
dlt_record_sender.commit()
if dlt_connector_client is not None:
dlt_connector_client.close()
def process_topology_event(self, event: TopologyEvent, dlt_record_sender: DltRecordSender) -> None:
topology_id = event.topology_id
topology_uuid = topology_id.topology_uuid.uuid
context_id = topology_id.context_id
context_uuid = context_id.context_uuid.uuid
topology_uuids = {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME}
context = self.context_client.GetContext(context_id)
context_name = context.name
topology_details = self.context_client.GetTopologyDetails(topology_id)
topology_name = topology_details.name
if ((context_uuid == DEFAULT_CONTEXT_NAME) or (context_name == DEFAULT_CONTEXT_NAME)) and \
(topology_uuid not in topology_uuids) and (topology_name not in topology_uuids):
for device in topology_details.devices:
dlt_record_sender.add_device(topology_id, device)
for link in topology_details.links:
dlt_record_sender.add_link(topology_id, link)
else:
MSG = 'Ignoring ({:s}/{:s})({:s}/{:s}) TopologyEvent({:s})'
args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event)
LOGGER.warning(MSG.format(*args))
#Check which ID to use.
def process_device_event(self, event: DeviceEvent, dlt_record_sender: DltRecordSender) -> None:
device_id = event.device_id
device_uuid = device_id.device_uuid.uuid
device = self.context_client.GetDevice(device_id)
dlt_record_sender.add_device(device_id.context_id, device)
def process_link_event(self, event: LinkEvent, dlt_record_sender: DltRecordSender) -> None:
link_id = event.link_id
link = self.context_client.GetLink(link_id)
dlt_record_sender.add_link(link_id.context_id, link)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment