diff --git a/src/interdomain/Dockerfile b/src/interdomain/Dockerfile index 69fcf3d9c52b9dc6232a2a8f3051acba88987408..66c6e938d7adf5bd985fc0b4f87fe284c1370be4 100644 --- a/src/interdomain/Dockerfile +++ b/src/interdomain/Dockerfile @@ -68,7 +68,7 @@ COPY src/dlt/. dlt/ COPY src/interdomain/. interdomain/ #COPY src/monitoring/. monitoring/ COPY src/pathcomp/. pathcomp/ -#COPY src/service/. service/ +COPY src/service/. service/ COPY src/slice/. slice/ # Start the service diff --git a/src/interdomain/service/InterdomainServiceServicerImpl.py b/src/interdomain/service/InterdomainServiceServicerImpl.py index 51c8ee39aa0fc70aa96fe8154cbc312043d2c488..fa6bec912413234d225ee1e4662fba0b2cb82b28 100644 --- a/src/interdomain/service/InterdomainServiceServicerImpl.py +++ b/src/interdomain/service/InterdomainServiceServicerImpl.py @@ -12,25 +12,35 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Dict, Tuple import grpc, logging, uuid -from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, ServiceNameEnum -from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_missing_environment_variables, get_env_var_name -from common.proto.context_pb2 import AuthenticationResult, Slice, SliceId, SliceStatusEnum, TeraFlowController, TopologyId +from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum +from common.Settings import ( + ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_environment_variables, get_env_var_name) +from common.proto.context_pb2 import ( + AuthenticationResult, Empty, EndPointId, Slice, SliceId, SliceStatusEnum, TeraFlowController, TopologyId) from common.proto.interdomain_pb2_grpc import InterdomainServiceServicer from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method +from common.tools.context_queries.CheckType import endpoint_type_is_border from common.tools.context_queries.Context import create_context +from common.tools.context_queries.Device import get_device from common.tools.context_queries.InterDomain import ( - compute_interdomain_path, compute_traversed_domains, get_local_device_uuids, is_inter_domain) -from common.tools.context_queries.Topology import create_topology -from common.tools.grpc.Tools import grpc_message_to_json_string + compute_interdomain_sub_slices, get_local_device_uuids, is_inter_domain) +from common.tools.context_queries.Slice import get_slice_by_id +from common.tools.context_queries.Topology import create_topology, get_topology +from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string +from common.tools.object_factory.Context import json_context_id +from common.tools.object_factory.Device import json_device_id +from common.tools.object_factory.EndPoint import json_endpoint_id from common.tools.object_factory.Topology import json_topology_id from context.client.ContextClient import ContextClient from dlt.connector.client.DltConnectorClient import DltConnectorClient -from interdomain.service.topology_abstractor.DltRecordSender import DltRecordSender from pathcomp.frontend.client.PathCompClient import PathCompClient +from service.client.ServiceClient import ServiceClient from slice.client.SliceClient import SliceClient +from .topology_abstractor.DltRecordSender import DltRecordSender from .RemoteDomainClients import RemoteDomainClients -from .Tools import compose_slice, compute_slice_owner, map_abstract_endpoints_to_real +from .Tools import compose_slice, compute_slice_owner #, map_abstract_endpoints_to_real LOGGER = logging.getLogger(__name__) @@ -57,39 +67,26 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer): str_slice = grpc_message_to_json_string(request) raise Exception('InterDomain can only handle inter-domain slice requests: {:s}'.format(str_slice)) - interdomain_path = compute_interdomain_path(pathcomp_client, request) - str_interdomain_path = [ - [device_uuid, [ - (endpoint_id.device_id.device_uuid.uuid, endpoint_id.endpoint_uuid.uuid) - for endpoint_id in endpoint_ids - ]] - for device_uuid, endpoint_ids in interdomain_path - ] - LOGGER.info('interdomain_path={:s}'.format(str(str_interdomain_path))) - - traversed_domains = compute_traversed_domains(context_client, interdomain_path) - str_traversed_domains = [ - (domain_uuid, is_local_domain, [ - (endpoint_id.device_id.device_uuid.uuid, endpoint_id.endpoint_uuid.uuid) - for endpoint_id in endpoint_ids - ]) - for domain_uuid,is_local_domain,endpoint_ids in traversed_domains - ] - LOGGER.info('traversed_domains={:s}'.format(str(str_traversed_domains))) - - slice_owner_uuid = compute_slice_owner(context_client, traversed_domains) - LOGGER.info('slice_owner_uuid={:s}'.format(str(slice_owner_uuid))) + local_slices, remote_slices = compute_interdomain_sub_slices( + context_client, pathcomp_client, request) + + traversed_domain_uuids = set() + traversed_domain_uuids.update(local_slices.keys()) + traversed_domain_uuids.update(remote_slices.keys()) + LOGGER.debug('traversed_domain_uuids={:s}'.format(str(traversed_domain_uuids))) + slice_owner_uuid = compute_slice_owner(context_client, traversed_domain_uuids) + LOGGER.debug('slice_owner_uuid={:s}'.format(str(slice_owner_uuid))) if slice_owner_uuid is None: raise Exception('Unable to identify slice owner') reply = Slice() reply.CopyFrom(request) - missing_env_vars = find_missing_environment_variables([ + env_vars = find_environment_variables([ get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST ), get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), ]) - if len(missing_env_vars) == 0: + if len(env_vars) == 2: # DLT available dlt_connector_client = DltConnectorClient() dlt_connector_client.connect() @@ -98,41 +95,80 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer): dlt_record_sender = DltRecordSender(context_client, dlt_connector_client) - for domain_uuid, is_local_domain, endpoint_ids in traversed_domains: - if is_local_domain: + for domain_uuid, endpoint_id_groups in local_slices.items(): + domain_topology = get_topology(context_client, domain_uuid) + if domain_topology is None: raise Exception('Topology({:s}) not found'.format(str(domain_uuid))) + domain_name = domain_topology.name + for endpoint_ids in endpoint_id_groups: slice_uuid = str(uuid.uuid4()) - LOGGER.info('[loop] [local] domain_uuid={:s} is_local_domain={:s} slice_uuid={:s}'.format( - str(domain_uuid), str(is_local_domain), str(slice_uuid))) + MSG = '[loop] [local] domain_uuid={:s} slice_uuid={:s} endpoint_ids={:s}' + LOGGER.debug(MSG.format(str(domain_uuid), str(slice_uuid), str([ + grpc_message_to_json(ep_id) for ep_id in endpoint_ids + ]))) # local slices always in DEFAULT_CONTEXT_NAME #context_uuid = request.slice_id.context_id.context_uuid.uuid context_uuid = DEFAULT_CONTEXT_NAME - endpoint_ids = map_abstract_endpoints_to_real(context_client, domain_uuid, endpoint_ids) + #endpoint_ids = map_abstract_endpoints_to_real(context_client, domain_uuid, endpoint_ids) + slice_name = '{:s}:local:{:s}'.format(request.name, domain_name) sub_slice = compose_slice( - context_uuid, slice_uuid, endpoint_ids, constraints=request.slice_constraints, + context_uuid, slice_uuid, endpoint_ids, slice_name=slice_name, constraints=request.slice_constraints, config_rules=request.slice_config.config_rules) - LOGGER.info('[loop] [local] sub_slice={:s}'.format(grpc_message_to_json_string(sub_slice))) + LOGGER.debug('[loop] [local] sub_slice={:s}'.format(grpc_message_to_json_string(sub_slice))) sub_slice_id = slice_client.CreateSlice(sub_slice) - else: - slice_uuid = request.slice_id.slice_uuid.uuid - LOGGER.info('[loop] [remote] domain_uuid={:s} is_local_domain={:s} slice_uuid={:s}'.format( - str(domain_uuid), str(is_local_domain), str(slice_uuid))) + + LOGGER.debug('[loop] adding sub-slice') + reply.slice_subslice_ids.add().CopyFrom(sub_slice_id) # pylint: disable=no-member + + for domain_uuid, endpoint_id_groups in remote_slices.items(): + domain_topology = get_device(context_client, domain_uuid) + if domain_topology is None: raise Exception('Device({:s}) not found'.format(str(domain_uuid))) + domain_name = domain_topology.name + domain_endpoint_ids_to_names = { + endpoint.endpoint_id.endpoint_uuid.uuid : endpoint.name + for endpoint in domain_topology.device_endpoints + if endpoint_type_is_border(endpoint.endpoint_type) + } + for endpoint_ids in endpoint_id_groups: + slice_uuid = str(uuid.uuid4()) + MSG = '[loop] [remote] domain_uuid={:s} slice_uuid={:s} endpoint_ids={:s}' + LOGGER.debug(MSG.format(str(domain_uuid), str(slice_uuid), str([ + grpc_message_to_json(ep_id) for ep_id in endpoint_ids + ]))) # create context/topology for the remote domains where we are creating slices - create_context(context_client, domain_uuid) + create_context(context_client, domain_uuid, name=domain_name) create_topology(context_client, domain_uuid, DEFAULT_TOPOLOGY_NAME) + create_topology(context_client, domain_uuid, INTERDOMAIN_TOPOLOGY_NAME) + + slice_name = '{:s}:remote:{:s}'.format(request.name, domain_name) + # convert endpoint ids to names to enable conversion to uuids on the remote domain + endpoint_ids = [ + EndPointId(**json_endpoint_id( + json_device_id(domain_name), + domain_endpoint_ids_to_names[endpoint_id.endpoint_uuid.uuid], + topology_id=json_topology_id( + INTERDOMAIN_TOPOLOGY_NAME, + context_id=json_context_id(DEFAULT_CONTEXT_NAME) + ) + )) + for endpoint_id in endpoint_ids + ] sub_slice = compose_slice( - domain_uuid, slice_uuid, endpoint_ids, constraints=request.slice_constraints, - config_rules=request.slice_config.config_rules, owner_uuid=slice_owner_uuid) - LOGGER.info('[loop] [remote] sub_slice={:s}'.format(grpc_message_to_json_string(sub_slice))) + DEFAULT_CONTEXT_NAME, slice_uuid, endpoint_ids, slice_name=slice_name, + constraints=request.slice_constraints, config_rules=request.slice_config.config_rules, + owner_uuid=slice_owner_uuid, owner_string=domain_uuid) + LOGGER.debug('[loop] [remote] sub_slice={:s}'.format(grpc_message_to_json_string(sub_slice))) sub_slice_id = context_client.SetSlice(sub_slice) if dlt_connector_client is not None: topology_id = TopologyId(**json_topology_id(domain_uuid)) dlt_record_sender.add_slice(topology_id, sub_slice) else: - interdomain_client = self.remote_domain_clients.get_peer('remote-teraflow') + interdomain_client = self.remote_domain_clients.get_peer(domain_uuid) + if interdomain_client is None: + raise Exception('InterDomain Client not found for Domain({:s})'.format(str(domain_uuid))) sub_slice_reply = interdomain_client.LookUpSlice(sub_slice) if sub_slice_reply == sub_slice.slice_id: # pylint: disable=no-member # successful case @@ -140,20 +176,23 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer): else: # not in catalog remote_sub_slice = interdomain_client.CreateSliceAndAddToCatalog(sub_slice) + + sub_slice.slice_status.slice_status = remote_sub_slice.slice_status.slice_status + context_client.SetSlice(sub_slice) if remote_sub_slice.slice_status.slice_status != SliceStatusEnum.SLICESTATUS_ACTIVE: raise Exception('Remote Slice creation failed. Wrong Slice status returned') - LOGGER.info('[loop] adding sub-slice') - reply.slice_subslice_ids.add().CopyFrom(sub_slice_id) # pylint: disable=no-member + LOGGER.debug('[loop] adding sub-slice') + reply.slice_subslice_ids.add().CopyFrom(sub_slice_id) # pylint: disable=no-member if dlt_connector_client is not None: - LOGGER.info('Recording Remote Slice requests to DLT') + LOGGER.debug('Recording Remote Slice requests to DLT') dlt_record_sender.commit() - LOGGER.info('Activating interdomain slice') + LOGGER.debug('Activating interdomain slice') reply.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_ACTIVE # pylint: disable=no-member - LOGGER.info('Updating interdomain slice') + LOGGER.debug('Updating interdomain slice') slice_id = context_client.SetSlice(reply) return slice_id @@ -168,22 +207,133 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer): def LookUpSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: try: context_client = ContextClient() - slice_ = context_client.GetSlice(request.slice_id) + slice_id = SliceId() + slice_id.CopyFrom(request.slice_id) + slice_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME + slice_ = context_client.GetSlice(slice_id) return slice_.slice_id except grpc.RpcError: #LOGGER.exception('Unable to get slice({:s})'.format(grpc_message_to_json_string(request.slice_id))) return SliceId() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def OrderSliceFromCatalog(self, request : Slice, context : grpc.ServicerContext) -> Slice: - raise NotImplementedError('OrderSliceFromCatalog') - #return Slice() + def CreateSliceAndAddToCatalog(self, request : Slice, context : grpc.ServicerContext) -> Slice: + context_client = ContextClient() + slice_client = SliceClient() + _request = Slice() + _request.CopyFrom(request) + _request.slice_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME + + #admin_context = context_client.GetContext(ContextId(**json_context_id(DEFAULT_CONTEXT_NAME))) + #admin_context_uuid = admin_context.context_id.context_uuid.uuid + #admin_context_name = admin_context.name + + #interdomain_topology = context_client.GetTopology(TopologyId(**json_topology_id( + # DEFAULT_TOPOLOGY_NAME, context_id=json_context_id(DEFAULT_CONTEXT_NAME) + #))) + #interdomain_topology_uuid = interdomain_topology.topology_id.topology_uuid.uuid + #interdomain_topology_name = interdomain_topology.name + + devices = context_client.ListDevices(Empty()) + interdomain_endpoint_map : Dict[str, Tuple[str, str, str, str]] = dict() + for device in devices.devices: + device_uuid = device.device_id.device_uuid.uuid + device_name = device.name + for endpoint in device.device_endpoints: + if not endpoint_type_is_border(endpoint.endpoint_type): continue + #endpoint_context_uuid = endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid + #if endpoint_context_uuid not in {admin_context_uuid, admin_context_name}: continue + #endpoint_topology_uuid = endpoint.endpoint_id.topology_id.topology_uuid.uuid + #if endpoint_topology_uuid not in {interdomain_topology_uuid, interdomain_topology_name}: continue + endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid + endpoint_name = endpoint.name + interdomain_endpoint_map[endpoint_name] = (device_uuid, device_name, endpoint_uuid, endpoint_name) + LOGGER.debug('interdomain_endpoint_map={:s}'.format(str(interdomain_endpoint_map))) + + # Map endpoints to local real counterparts + del _request.slice_endpoint_ids[:] + for endpoint_id in request.slice_endpoint_ids: + #endpoint_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid + #if endpoint_context_uuid not in {admin_context_uuid, admin_context_name}: + # MSG = 'Unexpected ContextId in EndPointId({:s})' + # raise Exception(MSG.format(grpc_message_to_json_string(endpoint_id))) + + #endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid + #if endpoint_topology_uuid not in {admin_topology_uuid, admin_topology_name}: + # MSG = 'Unexpected TopologyId in EndPointId({:s})' + # raise Exception(MSG.format(grpc_message_to_json_string(endpoint_id))) + + endpoint_uuid = endpoint_id.endpoint_uuid.uuid + real_endpoint = interdomain_endpoint_map.get(endpoint_uuid) + if real_endpoint is None: + MSG = 'Unable to map EndPointId({:s}) to real endpoint. interdomain_endpoint_map={:s}' + raise Exception(MSG.format(grpc_message_to_json_string(endpoint_id), str(interdomain_endpoint_map))) + real_device_uuid, _, real_endpoint_uuid, _ = real_endpoint + + real_endpoint_id = _request.slice_endpoint_ids.add() + real_endpoint_id.topology_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME + real_endpoint_id.topology_id.topology_uuid.uuid = DEFAULT_TOPOLOGY_NAME + real_endpoint_id.device_id.device_uuid.uuid = real_device_uuid + real_endpoint_id.endpoint_uuid.uuid = real_endpoint_uuid + + slice_id = slice_client.CreateSlice(_request) + return context_client.GetSlice(slice_id) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def CreateSliceAndAddToCatalog(self, request : Slice, context : grpc.ServicerContext) -> Slice: + def DeleteSlice(self, request : SliceId, context : grpc.ServicerContext) -> Empty: context_client = ContextClient() + try: + _slice = context_client.GetSlice(request) + except: # pylint: disable=bare-except + context_client.close() + return Empty() + + _slice_rw = Slice() + _slice_rw.CopyFrom(_slice) + _slice_rw.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_DEINIT # pylint: disable=no-member + context_client.SetSlice(_slice_rw) + + local_device_uuids = get_local_device_uuids(context_client) + slice_owner_uuid = _slice.slice_owner.owner_uuid.uuid + not_inter_domain = not is_inter_domain(context_client, _slice.slice_endpoint_ids) + no_slice_owner = len(slice_owner_uuid) == 0 + is_local_slice_owner = slice_owner_uuid in local_device_uuids + if not_inter_domain and (no_slice_owner or is_local_slice_owner): + str_slice = grpc_message_to_json_string(_slice) + raise Exception('InterDomain can only handle inter-domain slice requests: {:s}'.format(str_slice)) + slice_client = SliceClient() - reply = slice_client.CreateSlice(request) - if reply != request.slice_id: - raise Exception('Slice creation failed. Wrong Slice Id was returned') - return context_client.GetSlice(request.slice_id) + for subslice_id in _slice_rw.slice_subslice_ids: + sub_slice = get_slice_by_id(context_client, subslice_id, rw_copy=True) + if ':remote:' in sub_slice.name: + domain_uuid = sub_slice.slice_owner.owner_string + interdomain_client = self.remote_domain_clients.get_peer(domain_uuid) + if interdomain_client is None: + raise Exception('InterDomain Client not found for Domain({:s})'.format(str(domain_uuid))) + interdomain_client.DeleteSlice(subslice_id) + + tmp_slice = Slice() + tmp_slice.slice_id.CopyFrom(_slice_rw.slice_id) # pylint: disable=no-member + slice_subslice_id = tmp_slice.slice_subslice_ids.add() # pylint: disable=no-member + slice_subslice_id.CopyFrom(subslice_id) + context_client.UnsetSlice(tmp_slice) + + if ':remote:' in sub_slice.name: + context_client.RemoveSlice(subslice_id) + else: + slice_client.DeleteSlice(subslice_id) + + service_client = ServiceClient() + for service_id in _slice_rw.slice_service_ids: + tmp_slice = Slice() + tmp_slice.slice_id.CopyFrom(_slice_rw.slice_id) # pylint: disable=no-member + slice_service_id = tmp_slice.slice_service_ids.add() # pylint: disable=no-member + slice_service_id.CopyFrom(service_id) + context_client.UnsetSlice(tmp_slice) + service_client.DeleteService(service_id) + + context_client.RemoveSlice(request) + slice_client.close() + service_client.close() + context_client.close() + return Empty() diff --git a/src/interdomain/service/RemoteDomainClients.py b/src/interdomain/service/RemoteDomainClients.py index d60450a18287cf8297bd1a6d5bac03fbcccab408..adc6fe52b554d119db951189abcefe9e58860985 100644 --- a/src/interdomain/service/RemoteDomainClients.py +++ b/src/interdomain/service/RemoteDomainClients.py @@ -27,12 +27,13 @@ from interdomain.client.InterdomainClient import InterdomainClient LOGGER = logging.getLogger(__name__) -def get_domain_data(context_client : ContextClient, event : DeviceEvent) -> Optional[Tuple[str, str, int]]: +def get_domain_data(context_client : ContextClient, event : DeviceEvent) -> Optional[Tuple[str, str, str, int]]: device_uuid = event.device_id.device_uuid.uuid device = get_device( context_client, device_uuid, include_endpoints=False, include_components=False, include_config_rules=True) if device.device_type != DeviceTypeEnum.NETWORK.value: return None + idc_domain_uuid = device_uuid idc_domain_name = device.name idc_domain_address = None idc_domain_port = None @@ -45,7 +46,7 @@ def get_domain_data(context_client : ContextClient, event : DeviceEvent) -> Opti idc_domain_port = int(config_rule.custom.resource_value) if idc_domain_address is None: return None if idc_domain_port is None: return None - return idc_domain_name, idc_domain_address, idc_domain_port + return idc_domain_uuid, idc_domain_name, idc_domain_address, idc_domain_port class RemoteDomainClients(threading.Thread): def __init__(self) -> None: @@ -67,21 +68,22 @@ class RemoteDomainClients(threading.Thread): event = self.context_event_collector.get_event(timeout=0.1) if event is None: continue if not isinstance(event, DeviceEvent): continue - LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event))) + LOGGER.info('Processing DeviceEvent({:s})...'.format(grpc_message_to_json_string(event))) domain_data = get_domain_data(self.context_client, event) if domain_data is None: continue - domain_name, domain_address, domain_port = domain_data + domain_uuid, domain_name, domain_address, domain_port = domain_data try: - self.add_peer(domain_name, domain_address, domain_port) + self.add_peer(domain_uuid, domain_name, domain_address, domain_port) except: # pylint: disable=bare-except - MSG = 'Unable to connect to remote domain {:s} ({:s}:{:d})' - LOGGER.exception(MSG.format(domain_name, domain_address, domain_port)) + MSG = 'Unable to connect to remote domain {:s} {:s} ({:s}:{:d})' + LOGGER.exception(MSG.format(domain_uuid, domain_name, domain_address, domain_port)) self.context_event_collector.stop() self.context_client.close() def add_peer( - self, domain_name : str, domain_address : str, domain_port : int, context_uuid : str = DEFAULT_CONTEXT_NAME + self, domain_uuid : str, domain_name : str, domain_address : str, domain_port : int, + context_uuid : str = DEFAULT_CONTEXT_NAME ) -> None: request = TeraFlowController() request.context_id.context_uuid.uuid = context_uuid # pylint: disable=no-member @@ -96,18 +98,22 @@ class RemoteDomainClients(threading.Thread): if not reply.authenticated: MSG = 'Authentication against {:s}:{:d} with Context({:s}) rejected' # pylint: disable=broad-exception-raised - raise Exception(MSG.format(domain_address, domain_port, domain_name)) + raise Exception(MSG.format(domain_address, domain_port, context_uuid)) with self.lock: + self.peer_domains[domain_uuid] = interdomain_client self.peer_domains[domain_name] = interdomain_client - LOGGER.info('Added peer domain {:s} ({:s}:{:d})'.format(domain_name, domain_address, domain_port)) + MSG = 'Added peer domain {:s} {:s} ({:s}:{:d})' + LOGGER.info(MSG.format(domain_uuid, domain_name, domain_address, domain_port)) - def get_peer(self, domain_name : str) -> InterdomainClient: + def get_peer(self, domain_uuid_or_name : str) -> Optional[InterdomainClient]: with self.lock: - LOGGER.warning('peers: {:s}'.format(str(self.peer_domains))) - return self.peer_domains.get(domain_name) + LOGGER.debug('domain_uuid_or_name: {:s}'.format(str(domain_uuid_or_name))) + LOGGER.debug('peers: {:s}'.format(str(self.peer_domains))) + return self.peer_domains.get(domain_uuid_or_name) - def remove_peer(self, domain_name : str) -> None: + def remove_peer(self, domain_uuid_or_name : str) -> None: with self.lock: - self.peer_domains.pop(domain_name, None) - LOGGER.info('Removed peer domain {:s}'.format(domain_name)) + LOGGER.debug('domain_uuid_or_name: {:s}'.format(str(domain_uuid_or_name))) + self.peer_domains.pop(domain_uuid_or_name, None) + LOGGER.info('Removed peer domain {:s}'.format(domain_uuid_or_name)) diff --git a/src/interdomain/service/Tools.py b/src/interdomain/service/Tools.py index 94db60ed23b9776793d2c0d15a5394b649154a73..1c8fd90f1b983e66eca04143c3cf94164cdda425 100644 --- a/src/interdomain/service/Tools.py +++ b/src/interdomain/service/Tools.py @@ -13,10 +13,11 @@ # limitations under the License. import json, logging -from typing import List, Optional, Tuple +from typing import List, Optional, Set from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME +from common.DeviceTypes import DeviceTypeEnum from common.proto.context_pb2 import ( - ConfigRule, Constraint, ContextId, Device, Empty, EndPointId, Slice, SliceStatusEnum) + ConfigRule, Constraint, ContextId, Empty, EndPointId, Slice, SliceStatusEnum) from common.tools.context_queries.CheckType import device_type_is_network, endpoint_type_is_border from common.tools.context_queries.InterDomain import get_local_device_uuids from common.tools.grpc.ConfigRules import copy_config_rules @@ -28,27 +29,32 @@ from context.client.ContextClient import ContextClient LOGGER = logging.getLogger(__name__) def compute_slice_owner( - context_client : ContextClient, traversed_domains : List[Tuple[str, Device, bool, List[EndPointId]]] + context_client : ContextClient, traversed_domain_uuids : Set[str] ) -> Optional[str]: - traversed_domain_uuids = {traversed_domain[0] for traversed_domain in traversed_domains} - existing_topologies = context_client.ListTopologies(ContextId(**json_context_id(DEFAULT_CONTEXT_NAME))) - existing_topology_uuids_names = set() + domain_uuids_names = set() DISCARD_TOPOLOGY_NAMES = {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME} for topology in existing_topologies.topologies: topology_uuid = topology.topology_id.topology_uuid.uuid if topology_uuid in DISCARD_TOPOLOGY_NAMES: continue topology_name = topology.name if topology_name in DISCARD_TOPOLOGY_NAMES: continue - existing_topology_uuids_names.add(topology_uuid) - existing_topology_uuids_names.add(topology_name) + domain_uuids_names.add(topology_uuid) + domain_uuids_names.add(topology_name) + + for topology in existing_topologies.topologies: + topology_details = context_client.GetTopologyDetails(topology.topology_id) + for device in topology_details.devices: + if device.device_type != DeviceTypeEnum.NETWORK.value: continue + domain_uuids_names.discard(device.device_id.device_uuid.uuid) + domain_uuids_names.discard(device.name) - candidate_owner_uuids = traversed_domain_uuids.intersection(existing_topology_uuids_names) + candidate_owner_uuids = traversed_domain_uuids.intersection(domain_uuids_names) if len(candidate_owner_uuids) != 1: data = { - 'traversed_domain_uuids' : [td_uuid for td_uuid in traversed_domain_uuids ], - 'existing_topology_uuids_names': [et_uuid for et_uuid in existing_topology_uuids_names], - 'candidate_owner_uuids' : [co_uuid for co_uuid in candidate_owner_uuids ], + 'traversed_domain_uuids': [td_uuid for td_uuid in traversed_domain_uuids], + 'domain_uuids_names' : [et_uuid for et_uuid in domain_uuids_names ], + 'candidate_owner_uuids' : [co_uuid for co_uuid in candidate_owner_uuids ], } LOGGER.warning('Unable to identify slice owner: {:s}'.format(json.dumps(data))) return None @@ -56,17 +62,24 @@ def compute_slice_owner( return candidate_owner_uuids.pop() def compose_slice( - context_uuid : str, slice_uuid : str, endpoint_ids : List[EndPointId], constraints : List[Constraint] = [], - config_rules : List[ConfigRule] = [], owner_uuid : Optional[str] = None + context_uuid : str, slice_uuid : str, endpoint_ids : List[EndPointId], slice_name : Optional[str] = None, + constraints : List[Constraint] = [], config_rules : List[ConfigRule] = [], owner_uuid : Optional[str] = None, + owner_string : Optional[str] = None ) -> Slice: slice_ = Slice() slice_.slice_id.context_id.context_uuid.uuid = context_uuid # pylint: disable=no-member slice_.slice_id.slice_uuid.uuid = slice_uuid # pylint: disable=no-member slice_.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_PLANNED # pylint: disable=no-member + if slice_name is not None: + slice_.name = slice_name + if owner_uuid is not None: slice_.slice_owner.owner_uuid.uuid = owner_uuid # pylint: disable=no-member + if owner_string is not None: + slice_.slice_owner.owner_string = owner_string # pylint: disable=no-member + if len(endpoint_ids) >= 2: slice_.slice_endpoint_ids.add().CopyFrom(endpoint_ids[0]) # pylint: disable=no-member slice_.slice_endpoint_ids.add().CopyFrom(endpoint_ids[-1]) # pylint: disable=no-member diff --git a/src/interdomain/service/topology_abstractor/AbstractDevice.py b/src/interdomain/service/topology_abstractor/AbstractDevice.py index 0de93daa8c6e7a77b696cdf437d6e870d33b3666..47832acc02c78b5cfc095fb3ecceccfb6b9a774f 100644 --- a/src/interdomain/service/topology_abstractor/AbstractDevice.py +++ b/src/interdomain/service/topology_abstractor/AbstractDevice.py @@ -24,13 +24,15 @@ from common.tools.context_queries.Device import add_device_to_topology, get_exis from common.tools.object_factory.Context import json_context_id from common.tools.object_factory.Device import json_device, json_device_id from context.client.ContextClient import ContextClient +from context.service.database.uuids.EndPoint import endpoint_get_uuid LOGGER = logging.getLogger(__name__) class AbstractDevice: - def __init__(self, device_uuid : str, device_type : DeviceTypeEnum): + def __init__(self, device_uuid : str, device_name : str, device_type : DeviceTypeEnum): self.__context_client = ContextClient() self.__device_uuid : str = device_uuid + self.__device_name : str = device_name self.__device_type : DeviceTypeEnum = device_type self.__device : Optional[Device] = None self.__device_id : Optional[DeviceId] = None @@ -41,9 +43,23 @@ class AbstractDevice: # Dict[endpoint_uuid, device_uuid] self.__abstract_endpoint_to_device : Dict[str, str] = dict() + def to_json(self) -> Dict: + return { + 'device_uuid' : self.__device_uuid, + 'device_name' : self.__device_name, + 'device_type' : self.__device_type, + 'device' : self.__device, + 'device_id' : self.__device_id, + 'device_endpoint_to_abstract' : self.__device_endpoint_to_abstract, + 'abstract_endpoint_to_device' : self.__abstract_endpoint_to_device, + } + @property def uuid(self) -> str: return self.__device_uuid + @property + def name(self) -> str: return self.__device_name + @property def device_id(self) -> Optional[DeviceId]: return self.__device_id @@ -92,7 +108,7 @@ class AbstractDevice: device = Device(**json_device( device_uuid, self.__device_type.value, DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED, - endpoints=[], config_rules=[], drivers=[DeviceDriverEnum.DEVICEDRIVER_UNDEFINED] + name=self.__device_name, endpoints=[], config_rules=[], drivers=[DeviceDriverEnum.DEVICEDRIVER_UNDEFINED] )) self.__context_client.SetDevice(device) self.__device = device @@ -126,6 +142,14 @@ class AbstractDevice: self.__abstract_endpoint_to_device\ .setdefault(endpoint_uuid, device_uuid) + def _update_endpoint_name(self, device_uuid : str, endpoint_uuid : str, endpoint_name : str) -> bool: + device_endpoint_to_abstract = self.__device_endpoint_to_abstract.get(device_uuid, {}) + interdomain_endpoint = device_endpoint_to_abstract.get(endpoint_uuid) + interdomain_endpoint_name = interdomain_endpoint.name + if endpoint_name == interdomain_endpoint_name: return False + interdomain_endpoint.name = endpoint_name + return True + def _update_endpoint_type(self, device_uuid : str, endpoint_uuid : str, endpoint_type : str) -> bool: device_endpoint_to_abstract = self.__device_endpoint_to_abstract.get(device_uuid, {}) interdomain_endpoint = device_endpoint_to_abstract.get(endpoint_uuid) @@ -134,16 +158,24 @@ class AbstractDevice: interdomain_endpoint.endpoint_type = endpoint_type return True - def _add_endpoint(self, device_uuid : str, endpoint_uuid : str, endpoint_type : str) -> EndPoint: + def _add_endpoint( + self, device_uuid : str, endpoint_uuid : str, endpoint_name : str, endpoint_type : str + ) -> EndPoint: interdomain_endpoint = self.__device.device_endpoints.add() + interdomain_endpoint.endpoint_id.topology_id.topology_uuid.uuid = INTERDOMAIN_TOPOLOGY_NAME + interdomain_endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME interdomain_endpoint.endpoint_id.device_id.CopyFrom(self.__device_id) - interdomain_endpoint.endpoint_id.endpoint_uuid.uuid = endpoint_uuid + interdomain_endpoint.endpoint_id.endpoint_uuid.uuid = endpoint_name + interdomain_endpoint.name = endpoint_name interdomain_endpoint.endpoint_type = endpoint_type + uuids = endpoint_get_uuid(interdomain_endpoint.endpoint_id, endpoint_name=endpoint_name, allow_random=False) + _, _, interdomain_endpoint_uuid = uuids + self.__device_endpoint_to_abstract\ .setdefault(device_uuid, {}).setdefault(endpoint_uuid, interdomain_endpoint) self.__abstract_endpoint_to_device\ - .setdefault(endpoint_uuid, device_uuid) + .setdefault(interdomain_endpoint_uuid, device_uuid) return interdomain_endpoint @@ -160,7 +192,7 @@ class AbstractDevice: device_uuid = device.device_id.device_uuid.uuid device_border_endpoint_uuids = { - endpoint.endpoint_id.endpoint_uuid.uuid : endpoint.endpoint_type + endpoint.endpoint_id.endpoint_uuid.uuid : (endpoint.name, endpoint.endpoint_type) for endpoint in device.device_endpoints if endpoint_type_is_border(endpoint.endpoint_type) } @@ -177,14 +209,15 @@ class AbstractDevice: updated = True # for each border endpoint in device that is not in abstract device; add to abstract device - for endpoint_uuid,endpoint_type in device_border_endpoint_uuids.items(): - # if already added; just check endpoint type is not modified + for endpoint_uuid,(endpoint_name, endpoint_type) in device_border_endpoint_uuids.items(): + # if already added; just check endpoint name and type are not modified if endpoint_uuid in self.__abstract_endpoint_to_device: + updated = updated or self._update_endpoint_name(device_uuid, endpoint_uuid, endpoint_name) updated = updated or self._update_endpoint_type(device_uuid, endpoint_uuid, endpoint_type) continue # otherwise, add it to the abstract device - self._add_endpoint(device_uuid, endpoint_uuid, endpoint_type) + self._add_endpoint(device_uuid, endpoint_uuid, endpoint_name, endpoint_type) updated = True return updated diff --git a/src/interdomain/service/topology_abstractor/AbstractLink.py b/src/interdomain/service/topology_abstractor/AbstractLink.py index bdab62476c709de7d3fa2c4de2dba687714aba77..76b2a0311b2213d35c1b5461e06c324f9304b934 100644 --- a/src/interdomain/service/topology_abstractor/AbstractLink.py +++ b/src/interdomain/service/topology_abstractor/AbstractLink.py @@ -33,6 +33,14 @@ class AbstractLink: # Dict[(device_uuid, endpoint_uuid), abstract EndPointId] self.__device_endpoint_to_abstract : Dict[Tuple[str, str], EndPointId] = dict() + def to_json(self) -> Dict: + return { + 'link_uuid' : self.__link_uuid, + 'link' : self.__link, + 'link_id' : self.__link_id, + 'device_endpoint_to_abstract' : self.__device_endpoint_to_abstract, + } + @property def uuid(self) -> str: return self.__link_uuid @@ -95,6 +103,8 @@ class AbstractLink: def _add_endpoint(self, device_uuid : str, endpoint_uuid : str) -> None: endpoint_id = self.__link.link_endpoint_ids.add() + endpoint_id.topology_id.topology_uuid.uuid = INTERDOMAIN_TOPOLOGY_NAME + endpoint_id.topology_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME endpoint_id.device_id.device_uuid.uuid = device_uuid endpoint_id.endpoint_uuid.uuid = endpoint_uuid self.__device_endpoint_to_abstract.setdefault((device_uuid, endpoint_uuid), endpoint_id) diff --git a/src/interdomain/service/topology_abstractor/TopologyAbstractor.py b/src/interdomain/service/topology_abstractor/TopologyAbstractor.py index 40b40ac6604e044af1067308fce2ed0c64d30d44..0d9faa0408fe77dceaf5652b144590f9beb4a88d 100644 --- a/src/interdomain/service/topology_abstractor/TopologyAbstractor.py +++ b/src/interdomain/service/topology_abstractor/TopologyAbstractor.py @@ -16,15 +16,16 @@ import logging, threading from typing import Dict, Optional, Tuple from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum from common.DeviceTypes import DeviceTypeEnum -from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_missing_environment_variables, get_env_var_name +from common.Settings import ( + ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_environment_variables, get_env_var_name) from common.proto.context_pb2 import ( ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPoint, EndPointId, Link, LinkEvent, TopologyId, TopologyEvent) from common.tools.context_queries.CheckType import ( device_type_is_datacenter, device_type_is_network, endpoint_type_is_border) from common.tools.context_queries.Context import create_context -from common.tools.context_queries.Device import get_devices_in_topology, get_uuids_of_devices_in_topology -from common.tools.context_queries.Link import get_links_in_topology +from common.tools.context_queries.Device import get_uuids_of_devices_in_topology #, get_devices_in_topology +#from common.tools.context_queries.Link import get_links_in_topology from common.tools.context_queries.Topology import create_missing_topologies from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.Context import json_context_id @@ -99,13 +100,13 @@ class TopologyAbstractor(threading.Thread): # return False def _get_or_create_abstract_device( - self, device_uuid : str, device_type : DeviceTypeEnum, dlt_record_sender : DltRecordSender, + self, device_uuid : str, device_name : str, device_type : DeviceTypeEnum, dlt_record_sender : DltRecordSender, abstract_topology_id : TopologyId ) -> AbstractDevice: abstract_device = self.abstract_devices.get(device_uuid) changed = False if abstract_device is None: - abstract_device = AbstractDevice(device_uuid, device_type) + abstract_device = AbstractDevice(device_uuid, device_name, device_type) changed = abstract_device.initialize() if changed: dlt_record_sender.add_device(abstract_topology_id, abstract_device.device) self.abstract_devices[device_uuid] = abstract_device @@ -117,16 +118,17 @@ class TopologyAbstractor(threading.Thread): abstract_device_uuid : Optional[str] = None ) -> None: device_uuid = device.device_id.device_uuid.uuid + device_name = device.name if device_type_is_datacenter(device.device_type): abstract_device_uuid = device_uuid abstract_device = self._get_or_create_abstract_device( - device_uuid, DeviceTypeEnum.EMULATED_DATACENTER, dlt_record_sender, abstract_topology_id) + device_uuid, device_name, DeviceTypeEnum.EMULATED_DATACENTER, dlt_record_sender, abstract_topology_id) elif device_type_is_network(device.device_type): LOGGER.warning('device_type is network; not implemented') return else: abstract_device = self._get_or_create_abstract_device( - abstract_device_uuid, DeviceTypeEnum.NETWORK, dlt_record_sender, abstract_topology_id) + abstract_device_uuid, None, DeviceTypeEnum.NETWORK, dlt_record_sender, abstract_topology_id) self.real_to_abstract_device_uuid[device_uuid] = abstract_device_uuid changed = abstract_device.update_endpoints(device) if changed: dlt_record_sender.add_device(abstract_topology_id, abstract_device.device) @@ -224,11 +226,11 @@ class TopologyAbstractor(threading.Thread): if changed: dlt_record_sender.add_link(INTERDOMAIN_TOPOLOGY_ID, abstract_link.link) def update_abstraction(self, event : EventTypes) -> None: - missing_env_vars = find_missing_environment_variables([ + env_vars = find_environment_variables([ get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST ), get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), ]) - if len(missing_env_vars) == 0: + if len(env_vars) == 2: # DLT available dlt_connector_client = DltConnectorClient() dlt_connector_client.connect() @@ -238,41 +240,55 @@ class TopologyAbstractor(threading.Thread): dlt_record_sender = DltRecordSender(self.context_client, dlt_connector_client) if isinstance(event, ContextEvent): - LOGGER.warning('Ignoring Event({:s})'.format(grpc_message_to_json_string(event))) + LOGGER.debug('Processing ContextEvent({:s})'.format(grpc_message_to_json_string(event))) + LOGGER.warning('Ignoring ContextEvent({:s})'.format(grpc_message_to_json_string(event))) elif isinstance(event, TopologyEvent): + LOGGER.debug('Processing TopologyEvent({:s})'.format(grpc_message_to_json_string(event))) topology_id = event.topology_id topology_uuid = topology_id.topology_uuid.uuid context_id = topology_id.context_id context_uuid = context_id.context_uuid.uuid topology_uuids = {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME} - if (context_uuid == DEFAULT_CONTEXT_NAME) and (topology_uuid not in topology_uuids): + + context = self.context_client.GetContext(context_id) + context_name = context.name + + topology_details = self.context_client.GetTopologyDetails(topology_id) + topology_name = topology_details.name + + if ((context_uuid == DEFAULT_CONTEXT_NAME) or (context_name == DEFAULT_CONTEXT_NAME)) and \ + (topology_uuid not in topology_uuids) and (topology_name not in topology_uuids): + abstract_topology_id = TopologyId(**json_topology_id(topology_uuid, context_id=ADMIN_CONTEXT_ID)) self._get_or_create_abstract_device( - topology_uuid, DeviceTypeEnum.NETWORK, dlt_record_sender, abstract_topology_id) + topology_uuid, topology_name, DeviceTypeEnum.NETWORK, dlt_record_sender, abstract_topology_id) - devices = get_devices_in_topology(self.context_client, context_id, topology_uuid) - for device in devices: + #devices = get_devices_in_topology(self.context_client, context_id, topology_uuid) + for device in topology_details.devices: self._update_abstract_device( device, dlt_record_sender, abstract_topology_id, abstract_device_uuid=topology_uuid) - links = get_links_in_topology(self.context_client, context_id, topology_uuid) - for link in links: + #links = get_links_in_topology(self.context_client, context_id, topology_uuid) + for link in topology_details.links: self._update_abstract_link(link, dlt_record_sender, abstract_topology_id) - for device in devices: + for device in topology_details.devices: self._infer_abstract_links(device, dlt_record_sender) else: - LOGGER.warning('Ignoring Event({:s})'.format(grpc_message_to_json_string(event))) - + MSG = 'Ignoring ({:s}/{:s})({:s}/{:s}) TopologyEvent({:s})' + args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event) + LOGGER.warning(MSG.format(*args)) + elif isinstance(event, DeviceEvent): + LOGGER.debug('Processing DeviceEvent({:s})'.format(grpc_message_to_json_string(event))) device_id = event.device_id device_uuid = device_id.device_uuid.uuid abstract_device_uuid = self.real_to_abstract_device_uuid.get(device_uuid) device = self.context_client.GetDevice(device_id) if abstract_device_uuid is None: - LOGGER.warning('Ignoring Event({:s})'.format(grpc_message_to_json_string(event))) + LOGGER.warning('Ignoring DeviceEvent({:s})'.format(grpc_message_to_json_string(event))) else: abstract_topology_id = self.abstract_device_to_topology_id[abstract_device_uuid] self._update_abstract_device( @@ -281,11 +297,12 @@ class TopologyAbstractor(threading.Thread): self._infer_abstract_links(device, dlt_record_sender) elif isinstance(event, LinkEvent): + LOGGER.debug('Processing LinkEvent({:s})'.format(grpc_message_to_json_string(event))) link_id = event.link_id link_uuid = link_id.link_uuid.uuid abstract_link_uuid = self.real_to_abstract_link_uuid.get(link_uuid) if abstract_link_uuid is None: - LOGGER.warning('Ignoring Event({:s})'.format(grpc_message_to_json_string(event))) + LOGGER.warning('Ignoring LinkEvent({:s})'.format(grpc_message_to_json_string(event))) else: abstract_topology_id = self.abstract_link_to_topology_id[abstract_link_uuid] link = self.context_client.GetLink(link_id) diff --git a/src/interdomain/tests/test_unitary.py b/src/interdomain/tests/old_tests.py similarity index 99% rename from src/interdomain/tests/test_unitary.py rename to src/interdomain/tests/old_tests.py index 403dea54334da54a794b6da40dd64d1e6e856034..3543c9541371d3e1dd3d81e8f51c62082cec6ec1 100644 --- a/src/interdomain/tests/test_unitary.py +++ b/src/interdomain/tests/old_tests.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - #import logging, grpc #import os #import sqlite3 diff --git a/src/interdomain/tests/test_compute_domains.py b/src/interdomain/tests/test_compute_domains.py new file mode 100644 index 0000000000000000000000000000000000000000..3332731dd53ad62801c80c5172fbfee3ea943c6a --- /dev/null +++ b/src/interdomain/tests/test_compute_domains.py @@ -0,0 +1,119 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, pytest +from typing import Dict, List, Tuple +from common.DeviceTypes import DeviceTypeEnum +from common.proto.context_pb2 import EndPointId +from common.proto.pathcomp_pb2 import PathCompRequest +from common.tools.context_queries.Device import get_device +from common.tools.context_queries.InterDomain import get_device_to_domain_map, get_local_device_uuids +from common.tools.grpc.Tools import grpc_message_list_to_json, grpc_message_list_to_json_string, grpc_message_to_json_string +from context.client.ContextClient import ContextClient +from pathcomp.frontend.client.PathCompClient import PathCompClient + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + +@pytest.fixture(scope='session') +def context_client(): + _client = ContextClient() + yield _client + _client.close() + +@pytest.fixture(scope='session') +def pathcomp_client(): + _client = PathCompClient() + yield _client + _client.close() + +def test_interdomain_topology_abstractor( + context_client : ContextClient, # pylint: disable=redefined-outer-name + pathcomp_client : PathCompClient, # pylint: disable=redefined-outer-name +) -> None: + + pathcomp_req = PathCompRequest(**{ + "services": [ + {"name": "", "service_constraints": [{"sla_capacity": {"capacity_gbps": 10.0}}, {"sla_latency": {"e2e_latency_ms": 100.0}}], "service_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "cda90d2f-e7b0-5837-8f2e-2fb29dd9b367"}}, "endpoint_uuid": {"uuid": "37ab67ef-0064-54e3-ae9b-d40100953834"}, "topology_id": {"context_id": {"context_uuid": {"uuid": "43813baf-195e-5da6-af20-b3d0922e71a7"}}, "topology_uuid": {"uuid": "c76135e3-24a8-5e92-9bed-c3c9139359c8"}}}, + {"device_id": {"device_uuid": {"uuid": "800d5bd4-a7a3-5a66-82ab-d399767ca3d8"}}, "endpoint_uuid": {"uuid": "97f57787-cfec-5315-9718-7e850905f11a"}, "topology_id": {"context_id": {"context_uuid": {"uuid": "43813baf-195e-5da6-af20-b3d0922e71a7"}}, "topology_uuid": {"uuid": "c76135e3-24a8-5e92-9bed-c3c9139359c8"}}} + ], "service_id": {"context_id": {"context_uuid": {"uuid": "43813baf-195e-5da6-af20-b3d0922e71a7"}}, "service_uuid": {"uuid": "77277b43-f9cd-5e01-a3e7-6c5fa4577137"}}, "service_type": "SERVICETYPE_L2NM"} + ], + "shortest_path": {} + }) + pathcomp_req_svc = pathcomp_req.services[0] + + pathcomp_rep = pathcomp_client.Compute(pathcomp_req) + LOGGER.warning('pathcomp_rep = {:s}'.format(grpc_message_to_json_string(pathcomp_rep))) + + num_services = len(pathcomp_rep.services) + if num_services == 0: + raise Exception('No services received : {:s}'.format(grpc_message_to_json_string(pathcomp_rep))) + + num_connections = len(pathcomp_rep.connections) + if num_connections == 0: + raise Exception('No connections received : {:s}'.format(grpc_message_to_json_string(pathcomp_rep))) + + local_device_uuids = get_local_device_uuids(context_client) + LOGGER.warning('local_device_uuids={:s}'.format(str(local_device_uuids))) + + device_to_domain_map = get_device_to_domain_map(context_client) + LOGGER.warning('device_to_domain_map={:s}'.format(str(device_to_domain_map))) + + local_slices : List[List[EndPointId]] = list() + remote_slices : List[List[EndPointId]] = list() + req_service_uuid = pathcomp_req_svc.service_id.service_uuid.uuid + for service in pathcomp_rep.services: + service_uuid = service.service_id.service_uuid.uuid + if service_uuid == req_service_uuid: continue # main synthetic service; we don't care + device_uuids = { + endpoint_id.device_id.device_uuid.uuid + for endpoint_id in service.service_endpoint_ids + } + local_domain_uuids = set() + remote_domain_uuids = set() + for device_uuid in device_uuids: + if device_uuid in local_device_uuids: + domain_uuid = device_to_domain_map.get(device_uuid) + if domain_uuid is None: + raise Exception('Unable to map device({:s}) to a domain'.format(str(device_uuid))) + local_domain_uuids.add(domain_uuid) + else: + device = get_device( + context_client, device_uuid, include_endpoints=True, include_config_rules=False, + include_components=False) + if device is None: raise Exception('Device({:s}) not found'.format(str(device_uuid))) + device_type = DeviceTypeEnum._value2member_map_.get(device.device_type) + is_remote = device_type == DeviceTypeEnum.NETWORK + if not is_remote: + MSG = 'Weird device({:s}) is not local and not network' + raise Exception(MSG.format(grpc_message_to_json_string(device))) + remote_domain_uuids.add(device_uuid) + is_local = len(local_domain_uuids) > 0 + is_remote = len(remote_domain_uuids) > 0 + if is_local == is_remote: + MSG = 'Weird service combines local and remote devices: {:s}' + raise Exception(MSG.format(grpc_message_to_json_string(service))) + elif is_local: + local_slices.append(service.service_endpoint_ids) + else: + remote_slices.append(service.service_endpoint_ids) + + str_local_slices = [grpc_message_list_to_json(endpoint_ids) for endpoint_ids in local_slices] + LOGGER.warning('local_slices={:s}'.format(str(str_local_slices))) + + str_remote_slices = [grpc_message_list_to_json(endpoint_ids) for endpoint_ids in remote_slices] + LOGGER.warning('remote_slices={:s}'.format(str(str_remote_slices))) + + raise Exception() diff --git a/src/interdomain/tests/test_topology_abstractor.py b/src/interdomain/tests/test_topology_abstractor.py new file mode 100644 index 0000000000000000000000000000000000000000..e6243a236c6a2ab38bd5e5325ecdd1668d2b033f --- /dev/null +++ b/src/interdomain/tests/test_topology_abstractor.py @@ -0,0 +1,105 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, pytest, time +from common.Constants import DEFAULT_CONTEXT_NAME +from common.proto.context_pb2 import ContextId, Empty +from common.proto.context_pb2 import ConfigActionEnum, Device, DeviceId +from common.tools.descriptor.Loader import DescriptorLoader, check_descriptor_load_results, validate_empty_scenario +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.tools.object_factory.Context import json_context_id +from context.client.ContextClient import ContextClient +from device.client.DeviceClient import DeviceClient +from interdomain.service.topology_abstractor.TopologyAbstractor import TopologyAbstractor + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + +@pytest.fixture(scope='session') +def context_client(): + _client = ContextClient() + yield _client + _client.close() + +@pytest.fixture(scope='session') +def device_client(): + _client = DeviceClient() + yield _client + _client.close() + +@pytest.fixture(scope='session') +def topology_abstractor(): + _topology_abstractor = TopologyAbstractor() + _topology_abstractor.start() + yield _topology_abstractor + _topology_abstractor.stop() + _topology_abstractor.join() + +def test_pre_cleanup_scenario( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name +) -> None: + for link_id in context_client.ListLinkIds(Empty()).link_ids: context_client.RemoveLink(link_id) + for device_id in context_client.ListDeviceIds(Empty()).device_ids: device_client.DeleteDevice(device_id) + + contexts = context_client.ListContexts(Empty()) + for context in contexts.contexts: + assert len(context.slice_ids) == 0, 'Found Slices: {:s}'.format(grpc_message_to_json_string(context)) + assert len(context.service_ids) == 0, 'Found Services: {:s}'.format(grpc_message_to_json_string(context)) + for topology_id in context.topology_ids: context_client.RemoveTopology(topology_id) + context_client.RemoveContext(context.context_id) + +DESCRIPTOR_FILE = 'oeccpsc22/descriptors/domain1.json' +#DESCRIPTOR_FILE = 'oeccpsc22/descriptors/domain2.json' + +def test_interdomain_topology_abstractor( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + topology_abstractor : TopologyAbstractor, # pylint: disable=redefined-outer-name +) -> None: + #validate_empty_scenario(context_client) + + time.sleep(3) + + descriptor_loader = DescriptorLoader( + descriptors_file=DESCRIPTOR_FILE, context_client=context_client, device_client=device_client) + results = descriptor_loader.process() + check_descriptor_load_results(results, descriptor_loader) + #descriptor_loader.validate() + + time.sleep(3) + + LOGGER.warning('real_to_abstract_device_uuid={:s}'.format(str(topology_abstractor.real_to_abstract_device_uuid))) + LOGGER.warning('real_to_abstract_link_uuid={:s}'.format(str(topology_abstractor.real_to_abstract_link_uuid))) + + LOGGER.warning('abstract_device_to_topology_id={:s}'.format(str(topology_abstractor.abstract_device_to_topology_id))) + LOGGER.warning('abstract_link_to_topology_id={:s}'.format(str(topology_abstractor.abstract_link_to_topology_id))) + + LOGGER.warning('abstract_devices={:s}'.format(str({ + k:v.to_json() + for k,v in topology_abstractor.abstract_devices.items() + }))) + LOGGER.warning('abstract_links={:s}'.format(str({ + k:v.to_json() + for k,v in topology_abstractor.abstract_links.items() + }))) + + raise Exception() + + +#def test_post_cleanup_scenario( +# context_client : ContextClient, # pylint: disable=redefined-outer-name +# device_client : DeviceClient, # pylint: disable=redefined-outer-name +#) -> None: +# test_pre_cleanup_scenario(context_client, device_client)