From 1fcc9f27c376f280975ad2b7349cd7a63c7b2625 Mon Sep 17 00:00:00 2001
From: gifrerenom <lluis.gifre@cttc.es>
Date: Thu, 25 May 2023 17:33:47 +0000
Subject: [PATCH] Interdomain component:

- Made optional dependency on DLT
- Moved old code to separate file
- Implemented auto-detection of remote domains based on NETWORK-type devices onboarded to Context
---
 .../service/InterdomainServiceServicerImpl.py | 24 +++--
 .../service/RemoteDomainClients.py            | 95 +++++++++++++++----
 src/interdomain/service/__main__.py           |  8 +-
 .../service/_old_code/add_peer_manually.txt   |  3 +
 .../topology_abstractor/DltRecordSender.py    |  8 +-
 .../topology_abstractor/TopologyAbstractor.py | 20 +++-
 6 files changed, 119 insertions(+), 39 deletions(-)
 create mode 100644 src/interdomain/service/_old_code/add_peer_manually.txt

diff --git a/src/interdomain/service/InterdomainServiceServicerImpl.py b/src/interdomain/service/InterdomainServiceServicerImpl.py
index b72fc1b31..51c8ee39a 100644
--- a/src/interdomain/service/InterdomainServiceServicerImpl.py
+++ b/src/interdomain/service/InterdomainServiceServicerImpl.py
@@ -13,7 +13,8 @@
 # limitations under the License.
 
 import grpc, logging, uuid
-from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
+from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, ServiceNameEnum
+from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_missing_environment_variables, get_env_var_name
 from common.proto.context_pb2 import AuthenticationResult, Slice, SliceId, SliceStatusEnum, TeraFlowController, TopologyId
 from common.proto.interdomain_pb2_grpc import InterdomainServiceServicer
 from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
@@ -35,8 +36,6 @@ LOGGER = logging.getLogger(__name__)
 
 METRICS_POOL = MetricsPool('Interdomain', 'RPC')
 
-USE_DLT = True
-
 class InterdomainServiceServicerImpl(InterdomainServiceServicer):
     def __init__(self, remote_domain_clients : RemoteDomainClients):
         LOGGER.debug('Creating Servicer...')
@@ -48,7 +47,6 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer):
         context_client = ContextClient()
         pathcomp_client = PathCompClient()
         slice_client = SliceClient()
-        dlt_connector_client = DltConnectorClient()
 
         local_device_uuids = get_local_device_uuids(context_client)
         slice_owner_uuid = request.slice_owner.owner_uuid.uuid
@@ -87,6 +85,17 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer):
         reply = Slice()
         reply.CopyFrom(request)
 
+        missing_env_vars = find_missing_environment_variables([
+            get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST     ),
+            get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
+        ])
+        if len(missing_env_vars) == 0:
+            # DLT available
+            dlt_connector_client = DltConnectorClient()
+            dlt_connector_client.connect()
+        else:
+            dlt_connector_client = None
+
         dlt_record_sender = DltRecordSender(context_client, dlt_connector_client)
 
         for domain_uuid, is_local_domain, endpoint_ids in traversed_domains:
@@ -119,7 +128,7 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer):
                 LOGGER.info('[loop] [remote] sub_slice={:s}'.format(grpc_message_to_json_string(sub_slice)))
                 sub_slice_id = context_client.SetSlice(sub_slice)
 
-                if USE_DLT:
+                if dlt_connector_client is not None:
                     topology_id = TopologyId(**json_topology_id(domain_uuid))
                     dlt_record_sender.add_slice(topology_id, sub_slice)
                 else:
@@ -137,8 +146,9 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer):
             LOGGER.info('[loop] adding sub-slice')
             reply.slice_subslice_ids.add().CopyFrom(sub_slice_id)   # pylint: disable=no-member
 
-        LOGGER.info('Recording Remote Slice requests to DLT')
-        dlt_record_sender.commit()
+        if dlt_connector_client is not None:
+            LOGGER.info('Recording Remote Slice requests to DLT')
+            dlt_record_sender.commit()
 
         LOGGER.info('Activating interdomain slice')
         reply.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_ACTIVE # pylint: disable=no-member
diff --git a/src/interdomain/service/RemoteDomainClients.py b/src/interdomain/service/RemoteDomainClients.py
index 297c9a60d..e28176ef4 100644
--- a/src/interdomain/service/RemoteDomainClients.py
+++ b/src/interdomain/service/RemoteDomainClients.py
@@ -12,44 +12,101 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging, socket
+import logging, threading
+from typing import Optional, Tuple
 from common.Constants import DEFAULT_CONTEXT_NAME, ServiceNameEnum
+from common.DeviceTypes import DeviceTypeEnum
 from common.Settings import get_service_host, get_service_port_grpc
+from common.proto.context_pb2 import ConfigActionEnum, DeviceEvent
 from common.proto.context_pb2 import TeraFlowController
+from common.tools.context_queries.Device import get_device
+from common.tools.grpc.Tools import grpc_message_to_json_string
+from context.client.ContextClient import ContextClient
+from context.client.EventsCollector import EventsCollector
 from interdomain.client.InterdomainClient import InterdomainClient
 
 LOGGER = logging.getLogger(__name__)
 
-class RemoteDomainClients:
+def get_domain_data(context_client : ContextClient, event : DeviceEvent) -> Optional[Tuple[str, str, int]]:
+    device_uuid = event.device_id.device_uuid.uuid
+    device = get_device(
+        context_client, device_uuid, include_endpoints=False,
+        include_components=False, include_config_rules=True)
+    if device.device_type != DeviceTypeEnum.NETWORK.value: return None
+    idc_domain_name = device.name
+    idc_domain_address = None
+    idc_domain_port = None
+    for config_rule in device.device_config.config_rules:
+        if config_rule.action != ConfigActionEnum.CONFIGACTION_SET: continue
+        if config_rule.WhichOneof('config_rule') != 'custom': continue
+        if config_rule.custom.resource_key == '_connect/address':
+            idc_domain_address = config_rule.custom.resource_value
+        if config_rule.custom.resource_key == '_connect/port':
+            idc_domain_port = int(config_rule.custom.resource_value)
+    if idc_domain_address is None: return None
+    if idc_domain_port is None: return None
+    return idc_domain_name, idc_domain_address, idc_domain_port
+
+class RemoteDomainClients(threading.Thread):
     def __init__(self) -> None:
-        self.peer_domain = {}
+        super().__init__(daemon=True)
+        self.terminate = threading.Event()
+        self.lock = threading.Lock()
+        self.peer_domains = {}
+        self.context_client = ContextClient()
+        self.context_event_collector = EventsCollector(self.context_client)
 
-    def add_peer(
-            self, domain_name : str, host : str, port : int, context_uuid : str = DEFAULT_CONTEXT_NAME
-        ) -> None:
-        while True:
+    def stop(self):
+        self.terminate.set()
+
+    def run(self) -> None:
+        self.context_client.connect()
+        self.context_event_collector.start()
+
+        while not self.terminate.is_set():
+            event = self.context_event_collector.get_event(timeout=0.1)
+            if event is None: continue
+            if not isinstance(event, DeviceEvent): continue
+            LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event)))
+            domain_data = get_domain_data(self.context_client, event)
+            domain_name, domain_address, domain_port = domain_data
             try:
-                remote_teraflow_ip = socket.gethostbyname(host)
-                if len(remote_teraflow_ip) > 0: break
-            except socket.gaierror as e:
-                if str(e) == '[Errno -2] Name or service not known': continue
+                self.add_peer(domain_name, domain_address, domain_port)
+            except: # pylint: disable=bare-except
+                MSG = 'Unable to connect to remote domain {:s} ({:s}:{:d})'
+                LOGGER.exception(MSG.format(domain_name, domain_address, domain_port))
 
-        interdomain_client = InterdomainClient(host=host, port=port)
+        self.context_event_collector.stop()
+        self.context_client.close()
+
+    def add_peer(
+        self, domain_name : str, domain_address : str, domain_port : int, context_uuid : str = DEFAULT_CONTEXT_NAME
+    ) -> None:
         request = TeraFlowController()
-        request.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME # pylint: disable=no-member
+        request.context_id.context_uuid.uuid = context_uuid # pylint: disable=no-member
         request.ip_address = get_service_host(ServiceNameEnum.INTERDOMAIN)
         request.port = int(get_service_port_grpc(ServiceNameEnum.INTERDOMAIN))
 
+        interdomain_client = InterdomainClient(host=domain_address, port=domain_port)
+        interdomain_client.connect()
+
         reply = interdomain_client.Authenticate(request)
+
         if not reply.authenticated:
-            msg = 'Authentication against {:s}:{:d} rejected'
-            raise Exception(msg.format(str(remote_teraflow_ip), port))
+            MSG = 'Authentication against {:s}:{:d} with Context({:s}) rejected'
+            # pylint: disable=broad-exception-raised
+            raise Exception(MSG.format(domain_address, domain_port, domain_name))
 
-        self.peer_domain[domain_name] = interdomain_client
+        with self.lock:
+            self.peer_domains[domain_name] = interdomain_client
+            LOGGER.info('Added peer domain {:s} ({:s}:{:d})'.format(domain_name, domain_address, domain_port))
 
     def get_peer(self, domain_name : str) -> InterdomainClient:
-        LOGGER.warning('peers: {:s}'.format(str(self.peer_domain)))
-        return self.peer_domain.get(domain_name)
+        with self.lock:
+            LOGGER.warning('peers: {:s}'.format(str(self.peer_domains)))
+            return self.peer_domains.get(domain_name)
 
     def remove_peer(self, domain_name : str) -> None:
-        return self.peer_domain.pop(domain_name, None)
+        with self.lock:
+            self.peer_domains.pop(domain_name, None)
+            LOGGER.info('Removed peer domain {:s}'.format(domain_name))
diff --git a/src/interdomain/service/__main__.py b/src/interdomain/service/__main__.py
index 73fa93539..f867dc378 100644
--- a/src/interdomain/service/__main__.py
+++ b/src/interdomain/service/__main__.py
@@ -43,8 +43,6 @@ def main():
         get_env_var_name(ServiceNameEnum.PATHCOMP, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
         get_env_var_name(ServiceNameEnum.SLICE,    ENVVAR_SUFIX_SERVICE_HOST     ),
         get_env_var_name(ServiceNameEnum.SLICE,    ENVVAR_SUFIX_SERVICE_PORT_GRPC),
-        get_env_var_name(ServiceNameEnum.DLT,      ENVVAR_SUFIX_SERVICE_HOST     ),
-        get_env_var_name(ServiceNameEnum.DLT,      ENVVAR_SUFIX_SERVICE_PORT_GRPC),
     ])
 
     signal.signal(signal.SIGINT,  signal_handler)
@@ -58,6 +56,7 @@ def main():
 
     # Define remote domain clients
     remote_domain_clients = RemoteDomainClients()
+    remote_domain_clients.start()
 
     # Starting Interdomain service
     grpc_service = InterdomainService(remote_domain_clients)
@@ -67,16 +66,13 @@ def main():
     topology_abstractor = TopologyAbstractor()
     topology_abstractor.start()
 
-    # TODO: improve with configuration the definition of the remote peers
-    #interdomain_service_port_grpc = get_service_port_grpc(ServiceNameEnum.INTERDOMAIN)
-    #remote_domain_clients.add_peer('remote-teraflow', 'remote-teraflow', interdomain_service_port_grpc)
-
     # Wait for Ctrl+C or termination signal
     while not terminate.wait(timeout=1.0): pass
 
     LOGGER.info('Terminating...')
     topology_abstractor.stop()
     grpc_service.stop()
+    remote_domain_clients.stop()
 
     LOGGER.info('Bye')
     return 0
diff --git a/src/interdomain/service/_old_code/add_peer_manually.txt b/src/interdomain/service/_old_code/add_peer_manually.txt
new file mode 100644
index 000000000..6582044b3
--- /dev/null
+++ b/src/interdomain/service/_old_code/add_peer_manually.txt
@@ -0,0 +1,3 @@
+# TODO: improve with configuration the definition of the remote peers
+#interdomain_service_port_grpc = get_service_port_grpc(ServiceNameEnum.INTERDOMAIN)
+#remote_domain_clients.add_peer('remote-teraflow', 'remote-teraflow', interdomain_service_port_grpc)
diff --git a/src/interdomain/service/topology_abstractor/DltRecordSender.py b/src/interdomain/service/topology_abstractor/DltRecordSender.py
index d6efbc809..c9a61ef69 100644
--- a/src/interdomain/service/topology_abstractor/DltRecordSender.py
+++ b/src/interdomain/service/topology_abstractor/DltRecordSender.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 import logging
-from typing import Dict, List, Tuple
+from typing import Dict, List, Optional, Tuple
 from common.proto.context_pb2 import Device, Link, Service, Slice, TopologyId
 from common.proto.dlt_connector_pb2 import DltDeviceId, DltLinkId, DltServiceId, DltSliceId
 from context.client.ContextClient import ContextClient
@@ -23,7 +23,7 @@ from .Types import DltRecordTypes
 LOGGER = logging.getLogger(__name__)
 
 class DltRecordSender:
-    def __init__(self, context_client : ContextClient, dlt_connector_client : DltConnectorClient) -> None:
+    def __init__(self, context_client : ContextClient, dlt_connector_client : Optional[DltConnectorClient]) -> None:
         self.context_client = context_client
         self.dlt_connector_client = dlt_connector_client
         self.dlt_record_uuids : List[str] = list()
@@ -65,24 +65,28 @@ class DltRecordSender:
             topology_id,dlt_record = self.dlt_record_uuid_to_data[dlt_record_uuid]
             if isinstance(dlt_record, Device):
                 device_id = self.context_client.SetDevice(dlt_record)
+                if self.dlt_connector_client is None: continue
                 dlt_device_id = DltDeviceId()
                 dlt_device_id.topology_id.CopyFrom(topology_id)     # pylint: disable=no-member
                 dlt_device_id.device_id.CopyFrom(device_id)         # pylint: disable=no-member
                 self.dlt_connector_client.RecordDevice(dlt_device_id)
             elif isinstance(dlt_record, Link):
                 link_id = self.context_client.SetLink(dlt_record)
+                if self.dlt_connector_client is None: continue
                 dlt_link_id = DltLinkId()
                 dlt_link_id.topology_id.CopyFrom(topology_id)       # pylint: disable=no-member
                 dlt_link_id.link_id.CopyFrom(link_id)               # pylint: disable=no-member
                 self.dlt_connector_client.RecordLink(dlt_link_id)
             elif isinstance(dlt_record, Service):
                 service_id = self.context_client.SetService(dlt_record)
+                if self.dlt_connector_client is None: continue
                 dlt_service_id = DltServiceId()
                 dlt_service_id.topology_id.CopyFrom(topology_id)    # pylint: disable=no-member
                 dlt_service_id.service_id.CopyFrom(service_id)      # pylint: disable=no-member
                 self.dlt_connector_client.RecordService(dlt_service_id)
             elif isinstance(dlt_record, Slice):
                 slice_id = self.context_client.SetSlice(dlt_record)
+                if self.dlt_connector_client is None: continue
                 dlt_slice_id = DltSliceId()
                 dlt_slice_id.topology_id.CopyFrom(topology_id)      # pylint: disable=no-member
                 dlt_slice_id.slice_id.CopyFrom(slice_id)            # pylint: disable=no-member
diff --git a/src/interdomain/service/topology_abstractor/TopologyAbstractor.py b/src/interdomain/service/topology_abstractor/TopologyAbstractor.py
index bdbf016f8..20b186f30 100644
--- a/src/interdomain/service/topology_abstractor/TopologyAbstractor.py
+++ b/src/interdomain/service/topology_abstractor/TopologyAbstractor.py
@@ -14,8 +14,9 @@
 
 import logging, threading
 from typing import Dict, Optional, Tuple
-from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME
+from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum
 from common.DeviceTypes import DeviceTypeEnum
+from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_missing_environment_variables, get_env_var_name
 from common.proto.context_pb2 import (
     ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPoint, EndPointId, Link, LinkEvent, TopologyId,
     TopologyEvent)
@@ -48,7 +49,6 @@ class TopologyAbstractor(threading.Thread):
         self.terminate = threading.Event()
 
         self.context_client = ContextClient()
-        self.dlt_connector_client = DltConnectorClient()
         self.context_event_collector = EventsCollector(self.context_client)
 
         self.real_to_abstract_device_uuid : Dict[str, str] = dict()
@@ -69,7 +69,6 @@ class TopologyAbstractor(threading.Thread):
         topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME]
         create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids)
 
-        self.dlt_connector_client.connect()
         self.context_event_collector.start()
 
         while not self.terminate.is_set():
@@ -81,7 +80,6 @@ class TopologyAbstractor(threading.Thread):
 
         self.context_event_collector.stop()
         self.context_client.close()
-        self.dlt_connector_client.close()
 
     #def ignore_event(self, event : EventTypes) -> List[DltRecordIdTypes]:
     #    # TODO: filter events resulting from abstraction computation
@@ -226,7 +224,18 @@ class TopologyAbstractor(threading.Thread):
             if changed: dlt_record_sender.add_link(INTERDOMAIN_TOPOLOGY_ID, abstract_link.link)
 
     def update_abstraction(self, event : EventTypes) -> None:
-        dlt_record_sender = DltRecordSender(self.context_client, self.dlt_connector_client)
+        missing_env_vars = find_missing_environment_variables([
+            get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST     ),
+            get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
+        ])
+        if len(missing_env_vars) == 0:
+            # DLT available
+            dlt_connector_client = DltConnectorClient()
+            dlt_connector_client.connect()
+        else:
+            dlt_connector_client = None
+
+        dlt_record_sender = DltRecordSender(self.context_client, dlt_connector_client)
 
         if isinstance(event, ContextEvent):
             LOGGER.warning('Ignoring Event({:s})'.format(grpc_message_to_json_string(event)))
@@ -286,3 +295,4 @@ class TopologyAbstractor(threading.Thread):
             LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event)))
 
         dlt_record_sender.commit()
+        dlt_connector_client.close()
-- 
GitLab