Skip to content
Snippets Groups Projects
Commit 1fcc9f27 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Interdomain component:

- Made optional dependency on DLT
- Moved old code to separate file
- Implemented auto-detection of remote domains based on NETWORK-type devices onboarded to Context
parent e310be8b
No related branches found
No related tags found
2 merge requests!142Release TeraFlowSDN 2.1,!110Partial fix OECC/PSC'22 test, InterDomain component, and other side improvements
...@@ -13,7 +13,8 @@ ...@@ -13,7 +13,8 @@
# limitations under the License. # limitations under the License.
import grpc, logging, uuid import grpc, logging, uuid
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, ServiceNameEnum
from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_missing_environment_variables, get_env_var_name
from common.proto.context_pb2 import AuthenticationResult, Slice, SliceId, SliceStatusEnum, TeraFlowController, TopologyId from common.proto.context_pb2 import AuthenticationResult, Slice, SliceId, SliceStatusEnum, TeraFlowController, TopologyId
from common.proto.interdomain_pb2_grpc import InterdomainServiceServicer from common.proto.interdomain_pb2_grpc import InterdomainServiceServicer
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
...@@ -35,8 +36,6 @@ LOGGER = logging.getLogger(__name__) ...@@ -35,8 +36,6 @@ LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Interdomain', 'RPC') METRICS_POOL = MetricsPool('Interdomain', 'RPC')
USE_DLT = True
class InterdomainServiceServicerImpl(InterdomainServiceServicer): class InterdomainServiceServicerImpl(InterdomainServiceServicer):
def __init__(self, remote_domain_clients : RemoteDomainClients): def __init__(self, remote_domain_clients : RemoteDomainClients):
LOGGER.debug('Creating Servicer...') LOGGER.debug('Creating Servicer...')
...@@ -48,7 +47,6 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer): ...@@ -48,7 +47,6 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer):
context_client = ContextClient() context_client = ContextClient()
pathcomp_client = PathCompClient() pathcomp_client = PathCompClient()
slice_client = SliceClient() slice_client = SliceClient()
dlt_connector_client = DltConnectorClient()
local_device_uuids = get_local_device_uuids(context_client) local_device_uuids = get_local_device_uuids(context_client)
slice_owner_uuid = request.slice_owner.owner_uuid.uuid slice_owner_uuid = request.slice_owner.owner_uuid.uuid
...@@ -87,6 +85,17 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer): ...@@ -87,6 +85,17 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer):
reply = Slice() reply = Slice()
reply.CopyFrom(request) reply.CopyFrom(request)
missing_env_vars = find_missing_environment_variables([
get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
if len(missing_env_vars) == 0:
# DLT available
dlt_connector_client = DltConnectorClient()
dlt_connector_client.connect()
else:
dlt_connector_client = None
dlt_record_sender = DltRecordSender(context_client, dlt_connector_client) dlt_record_sender = DltRecordSender(context_client, dlt_connector_client)
for domain_uuid, is_local_domain, endpoint_ids in traversed_domains: for domain_uuid, is_local_domain, endpoint_ids in traversed_domains:
...@@ -119,7 +128,7 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer): ...@@ -119,7 +128,7 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer):
LOGGER.info('[loop] [remote] sub_slice={:s}'.format(grpc_message_to_json_string(sub_slice))) LOGGER.info('[loop] [remote] sub_slice={:s}'.format(grpc_message_to_json_string(sub_slice)))
sub_slice_id = context_client.SetSlice(sub_slice) sub_slice_id = context_client.SetSlice(sub_slice)
if USE_DLT: if dlt_connector_client is not None:
topology_id = TopologyId(**json_topology_id(domain_uuid)) topology_id = TopologyId(**json_topology_id(domain_uuid))
dlt_record_sender.add_slice(topology_id, sub_slice) dlt_record_sender.add_slice(topology_id, sub_slice)
else: else:
...@@ -137,8 +146,9 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer): ...@@ -137,8 +146,9 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer):
LOGGER.info('[loop] adding sub-slice') LOGGER.info('[loop] adding sub-slice')
reply.slice_subslice_ids.add().CopyFrom(sub_slice_id) # pylint: disable=no-member reply.slice_subslice_ids.add().CopyFrom(sub_slice_id) # pylint: disable=no-member
LOGGER.info('Recording Remote Slice requests to DLT') if dlt_connector_client is not None:
dlt_record_sender.commit() LOGGER.info('Recording Remote Slice requests to DLT')
dlt_record_sender.commit()
LOGGER.info('Activating interdomain slice') LOGGER.info('Activating interdomain slice')
reply.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_ACTIVE # pylint: disable=no-member reply.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_ACTIVE # pylint: disable=no-member
......
...@@ -12,44 +12,101 @@ ...@@ -12,44 +12,101 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging, socket import logging, threading
from typing import Optional, Tuple
from common.Constants import DEFAULT_CONTEXT_NAME, ServiceNameEnum from common.Constants import DEFAULT_CONTEXT_NAME, ServiceNameEnum
from common.DeviceTypes import DeviceTypeEnum
from common.Settings import get_service_host, get_service_port_grpc from common.Settings import get_service_host, get_service_port_grpc
from common.proto.context_pb2 import ConfigActionEnum, DeviceEvent
from common.proto.context_pb2 import TeraFlowController from common.proto.context_pb2 import TeraFlowController
from common.tools.context_queries.Device import get_device
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
from interdomain.client.InterdomainClient import InterdomainClient from interdomain.client.InterdomainClient import InterdomainClient
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class RemoteDomainClients: def get_domain_data(context_client : ContextClient, event : DeviceEvent) -> Optional[Tuple[str, str, int]]:
device_uuid = event.device_id.device_uuid.uuid
device = get_device(
context_client, device_uuid, include_endpoints=False,
include_components=False, include_config_rules=True)
if device.device_type != DeviceTypeEnum.NETWORK.value: return None
idc_domain_name = device.name
idc_domain_address = None
idc_domain_port = None
for config_rule in device.device_config.config_rules:
if config_rule.action != ConfigActionEnum.CONFIGACTION_SET: continue
if config_rule.WhichOneof('config_rule') != 'custom': continue
if config_rule.custom.resource_key == '_connect/address':
idc_domain_address = config_rule.custom.resource_value
if config_rule.custom.resource_key == '_connect/port':
idc_domain_port = int(config_rule.custom.resource_value)
if idc_domain_address is None: return None
if idc_domain_port is None: return None
return idc_domain_name, idc_domain_address, idc_domain_port
class RemoteDomainClients(threading.Thread):
def __init__(self) -> None: def __init__(self) -> None:
self.peer_domain = {} super().__init__(daemon=True)
self.terminate = threading.Event()
self.lock = threading.Lock()
self.peer_domains = {}
self.context_client = ContextClient()
self.context_event_collector = EventsCollector(self.context_client)
def add_peer( def stop(self):
self, domain_name : str, host : str, port : int, context_uuid : str = DEFAULT_CONTEXT_NAME self.terminate.set()
) -> None:
while True: def run(self) -> None:
self.context_client.connect()
self.context_event_collector.start()
while not self.terminate.is_set():
event = self.context_event_collector.get_event(timeout=0.1)
if event is None: continue
if not isinstance(event, DeviceEvent): continue
LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event)))
domain_data = get_domain_data(self.context_client, event)
domain_name, domain_address, domain_port = domain_data
try: try:
remote_teraflow_ip = socket.gethostbyname(host) self.add_peer(domain_name, domain_address, domain_port)
if len(remote_teraflow_ip) > 0: break except: # pylint: disable=bare-except
except socket.gaierror as e: MSG = 'Unable to connect to remote domain {:s} ({:s}:{:d})'
if str(e) == '[Errno -2] Name or service not known': continue LOGGER.exception(MSG.format(domain_name, domain_address, domain_port))
interdomain_client = InterdomainClient(host=host, port=port) self.context_event_collector.stop()
self.context_client.close()
def add_peer(
self, domain_name : str, domain_address : str, domain_port : int, context_uuid : str = DEFAULT_CONTEXT_NAME
) -> None:
request = TeraFlowController() request = TeraFlowController()
request.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME # pylint: disable=no-member request.context_id.context_uuid.uuid = context_uuid # pylint: disable=no-member
request.ip_address = get_service_host(ServiceNameEnum.INTERDOMAIN) request.ip_address = get_service_host(ServiceNameEnum.INTERDOMAIN)
request.port = int(get_service_port_grpc(ServiceNameEnum.INTERDOMAIN)) request.port = int(get_service_port_grpc(ServiceNameEnum.INTERDOMAIN))
interdomain_client = InterdomainClient(host=domain_address, port=domain_port)
interdomain_client.connect()
reply = interdomain_client.Authenticate(request) reply = interdomain_client.Authenticate(request)
if not reply.authenticated: if not reply.authenticated:
msg = 'Authentication against {:s}:{:d} rejected' MSG = 'Authentication against {:s}:{:d} with Context({:s}) rejected'
raise Exception(msg.format(str(remote_teraflow_ip), port)) # pylint: disable=broad-exception-raised
raise Exception(MSG.format(domain_address, domain_port, domain_name))
self.peer_domain[domain_name] = interdomain_client with self.lock:
self.peer_domains[domain_name] = interdomain_client
LOGGER.info('Added peer domain {:s} ({:s}:{:d})'.format(domain_name, domain_address, domain_port))
def get_peer(self, domain_name : str) -> InterdomainClient: def get_peer(self, domain_name : str) -> InterdomainClient:
LOGGER.warning('peers: {:s}'.format(str(self.peer_domain))) with self.lock:
return self.peer_domain.get(domain_name) LOGGER.warning('peers: {:s}'.format(str(self.peer_domains)))
return self.peer_domains.get(domain_name)
def remove_peer(self, domain_name : str) -> None: def remove_peer(self, domain_name : str) -> None:
return self.peer_domain.pop(domain_name, None) with self.lock:
self.peer_domains.pop(domain_name, None)
LOGGER.info('Removed peer domain {:s}'.format(domain_name))
...@@ -43,8 +43,6 @@ def main(): ...@@ -43,8 +43,6 @@ def main():
get_env_var_name(ServiceNameEnum.PATHCOMP, ENVVAR_SUFIX_SERVICE_PORT_GRPC), get_env_var_name(ServiceNameEnum.PATHCOMP, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_HOST ), get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC), get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
]) ])
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
...@@ -58,6 +56,7 @@ def main(): ...@@ -58,6 +56,7 @@ def main():
# Define remote domain clients # Define remote domain clients
remote_domain_clients = RemoteDomainClients() remote_domain_clients = RemoteDomainClients()
remote_domain_clients.start()
# Starting Interdomain service # Starting Interdomain service
grpc_service = InterdomainService(remote_domain_clients) grpc_service = InterdomainService(remote_domain_clients)
...@@ -67,16 +66,13 @@ def main(): ...@@ -67,16 +66,13 @@ def main():
topology_abstractor = TopologyAbstractor() topology_abstractor = TopologyAbstractor()
topology_abstractor.start() topology_abstractor.start()
# TODO: improve with configuration the definition of the remote peers
#interdomain_service_port_grpc = get_service_port_grpc(ServiceNameEnum.INTERDOMAIN)
#remote_domain_clients.add_peer('remote-teraflow', 'remote-teraflow', interdomain_service_port_grpc)
# Wait for Ctrl+C or termination signal # Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass while not terminate.wait(timeout=1.0): pass
LOGGER.info('Terminating...') LOGGER.info('Terminating...')
topology_abstractor.stop() topology_abstractor.stop()
grpc_service.stop() grpc_service.stop()
remote_domain_clients.stop()
LOGGER.info('Bye') LOGGER.info('Bye')
return 0 return 0
......
# TODO: improve with configuration the definition of the remote peers
#interdomain_service_port_grpc = get_service_port_grpc(ServiceNameEnum.INTERDOMAIN)
#remote_domain_clients.add_peer('remote-teraflow', 'remote-teraflow', interdomain_service_port_grpc)
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
# limitations under the License. # limitations under the License.
import logging import logging
from typing import Dict, List, Tuple from typing import Dict, List, Optional, Tuple
from common.proto.context_pb2 import Device, Link, Service, Slice, TopologyId from common.proto.context_pb2 import Device, Link, Service, Slice, TopologyId
from common.proto.dlt_connector_pb2 import DltDeviceId, DltLinkId, DltServiceId, DltSliceId from common.proto.dlt_connector_pb2 import DltDeviceId, DltLinkId, DltServiceId, DltSliceId
from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient
...@@ -23,7 +23,7 @@ from .Types import DltRecordTypes ...@@ -23,7 +23,7 @@ from .Types import DltRecordTypes
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class DltRecordSender: class DltRecordSender:
def __init__(self, context_client : ContextClient, dlt_connector_client : DltConnectorClient) -> None: def __init__(self, context_client : ContextClient, dlt_connector_client : Optional[DltConnectorClient]) -> None:
self.context_client = context_client self.context_client = context_client
self.dlt_connector_client = dlt_connector_client self.dlt_connector_client = dlt_connector_client
self.dlt_record_uuids : List[str] = list() self.dlt_record_uuids : List[str] = list()
...@@ -65,24 +65,28 @@ class DltRecordSender: ...@@ -65,24 +65,28 @@ class DltRecordSender:
topology_id,dlt_record = self.dlt_record_uuid_to_data[dlt_record_uuid] topology_id,dlt_record = self.dlt_record_uuid_to_data[dlt_record_uuid]
if isinstance(dlt_record, Device): if isinstance(dlt_record, Device):
device_id = self.context_client.SetDevice(dlt_record) device_id = self.context_client.SetDevice(dlt_record)
if self.dlt_connector_client is None: continue
dlt_device_id = DltDeviceId() dlt_device_id = DltDeviceId()
dlt_device_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member dlt_device_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member
dlt_device_id.device_id.CopyFrom(device_id) # pylint: disable=no-member dlt_device_id.device_id.CopyFrom(device_id) # pylint: disable=no-member
self.dlt_connector_client.RecordDevice(dlt_device_id) self.dlt_connector_client.RecordDevice(dlt_device_id)
elif isinstance(dlt_record, Link): elif isinstance(dlt_record, Link):
link_id = self.context_client.SetLink(dlt_record) link_id = self.context_client.SetLink(dlt_record)
if self.dlt_connector_client is None: continue
dlt_link_id = DltLinkId() dlt_link_id = DltLinkId()
dlt_link_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member dlt_link_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member
dlt_link_id.link_id.CopyFrom(link_id) # pylint: disable=no-member dlt_link_id.link_id.CopyFrom(link_id) # pylint: disable=no-member
self.dlt_connector_client.RecordLink(dlt_link_id) self.dlt_connector_client.RecordLink(dlt_link_id)
elif isinstance(dlt_record, Service): elif isinstance(dlt_record, Service):
service_id = self.context_client.SetService(dlt_record) service_id = self.context_client.SetService(dlt_record)
if self.dlt_connector_client is None: continue
dlt_service_id = DltServiceId() dlt_service_id = DltServiceId()
dlt_service_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member dlt_service_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member
dlt_service_id.service_id.CopyFrom(service_id) # pylint: disable=no-member dlt_service_id.service_id.CopyFrom(service_id) # pylint: disable=no-member
self.dlt_connector_client.RecordService(dlt_service_id) self.dlt_connector_client.RecordService(dlt_service_id)
elif isinstance(dlt_record, Slice): elif isinstance(dlt_record, Slice):
slice_id = self.context_client.SetSlice(dlt_record) slice_id = self.context_client.SetSlice(dlt_record)
if self.dlt_connector_client is None: continue
dlt_slice_id = DltSliceId() dlt_slice_id = DltSliceId()
dlt_slice_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member dlt_slice_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member
dlt_slice_id.slice_id.CopyFrom(slice_id) # pylint: disable=no-member dlt_slice_id.slice_id.CopyFrom(slice_id) # pylint: disable=no-member
......
...@@ -14,8 +14,9 @@ ...@@ -14,8 +14,9 @@
import logging, threading import logging, threading
from typing import Dict, Optional, Tuple from typing import Dict, Optional, Tuple
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum
from common.DeviceTypes import DeviceTypeEnum from common.DeviceTypes import DeviceTypeEnum
from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_missing_environment_variables, get_env_var_name
from common.proto.context_pb2 import ( from common.proto.context_pb2 import (
ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPoint, EndPointId, Link, LinkEvent, TopologyId, ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPoint, EndPointId, Link, LinkEvent, TopologyId,
TopologyEvent) TopologyEvent)
...@@ -48,7 +49,6 @@ class TopologyAbstractor(threading.Thread): ...@@ -48,7 +49,6 @@ class TopologyAbstractor(threading.Thread):
self.terminate = threading.Event() self.terminate = threading.Event()
self.context_client = ContextClient() self.context_client = ContextClient()
self.dlt_connector_client = DltConnectorClient()
self.context_event_collector = EventsCollector(self.context_client) self.context_event_collector = EventsCollector(self.context_client)
self.real_to_abstract_device_uuid : Dict[str, str] = dict() self.real_to_abstract_device_uuid : Dict[str, str] = dict()
...@@ -69,7 +69,6 @@ class TopologyAbstractor(threading.Thread): ...@@ -69,7 +69,6 @@ class TopologyAbstractor(threading.Thread):
topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME] topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME]
create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids) create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids)
self.dlt_connector_client.connect()
self.context_event_collector.start() self.context_event_collector.start()
while not self.terminate.is_set(): while not self.terminate.is_set():
...@@ -81,7 +80,6 @@ class TopologyAbstractor(threading.Thread): ...@@ -81,7 +80,6 @@ class TopologyAbstractor(threading.Thread):
self.context_event_collector.stop() self.context_event_collector.stop()
self.context_client.close() self.context_client.close()
self.dlt_connector_client.close()
#def ignore_event(self, event : EventTypes) -> List[DltRecordIdTypes]: #def ignore_event(self, event : EventTypes) -> List[DltRecordIdTypes]:
# # TODO: filter events resulting from abstraction computation # # TODO: filter events resulting from abstraction computation
...@@ -226,7 +224,18 @@ class TopologyAbstractor(threading.Thread): ...@@ -226,7 +224,18 @@ class TopologyAbstractor(threading.Thread):
if changed: dlt_record_sender.add_link(INTERDOMAIN_TOPOLOGY_ID, abstract_link.link) if changed: dlt_record_sender.add_link(INTERDOMAIN_TOPOLOGY_ID, abstract_link.link)
def update_abstraction(self, event : EventTypes) -> None: def update_abstraction(self, event : EventTypes) -> None:
dlt_record_sender = DltRecordSender(self.context_client, self.dlt_connector_client) missing_env_vars = find_missing_environment_variables([
get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
if len(missing_env_vars) == 0:
# DLT available
dlt_connector_client = DltConnectorClient()
dlt_connector_client.connect()
else:
dlt_connector_client = None
dlt_record_sender = DltRecordSender(self.context_client, dlt_connector_client)
if isinstance(event, ContextEvent): if isinstance(event, ContextEvent):
LOGGER.warning('Ignoring Event({:s})'.format(grpc_message_to_json_string(event))) LOGGER.warning('Ignoring Event({:s})'.format(grpc_message_to_json_string(event)))
...@@ -286,3 +295,4 @@ class TopologyAbstractor(threading.Thread): ...@@ -286,3 +295,4 @@ class TopologyAbstractor(threading.Thread):
LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event))) LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event)))
dlt_record_sender.commit() dlt_record_sender.commit()
dlt_connector_client.close()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment