# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from typing import Dict, List, Set, Tuple from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME from common.DeviceTypes import DeviceTypeEnum from common.proto.context_pb2 import ContextId, Device, Empty, EndPointId, ServiceTypeEnum, Slice from common.proto.pathcomp_pb2 import PathCompRequest from common.tools.context_queries.CheckType import device_type_is_network from common.tools.context_queries.Device import get_devices_in_topology from common.tools.context_queries.Topology import get_topology from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.Context import json_context_id from context.client.ContextClient import ContextClient from pathcomp.frontend.client.PathCompClient import PathCompClient LOGGER = logging.getLogger(__name__) ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) DATACENTER_DEVICE_TYPES = {DeviceTypeEnum.DATACENTER, DeviceTypeEnum.EMULATED_DATACENTER} def get_local_device_uuids(context_client : ContextClient) -> Set[str]: topologies = context_client.ListTopologies(ADMIN_CONTEXT_ID) topologies = {topology.topology_id.topology_uuid.uuid : topology for topology in topologies.topologies} LOGGER.info('[get_local_device_uuids] topologies.keys()={:s}'.format(str(topologies.keys()))) local_topology_uuids = set(topologies.keys()) local_topology_uuids.discard(INTERDOMAIN_TOPOLOGY_NAME) LOGGER.info('[get_local_device_uuids] local_topology_uuids={:s}'.format(str(local_topology_uuids))) local_device_uuids = set() # add topology names except DEFAULT_TOPOLOGY_NAME and INTERDOMAIN_TOPOLOGY_NAME; they are abstracted as a # local device in inter-domain and the name of the topology is used as abstract device name for local_topology_uuid in local_topology_uuids: if local_topology_uuid == DEFAULT_TOPOLOGY_NAME: continue local_device_uuids.add(local_topology_uuid) # add physical devices in the local topologies for local_topology_uuid in local_topology_uuids: topology_device_ids = topologies[local_topology_uuid].device_ids topology_device_uuids = {device_id.device_uuid.uuid for device_id in topology_device_ids} LOGGER.info('[get_local_device_uuids] [loop] local_topology_uuid={:s} topology_device_uuids={:s}'.format( str(local_topology_uuid), str(topology_device_uuids))) local_device_uuids.update(topology_device_uuids) LOGGER.info('[get_local_device_uuids] local_device_uuids={:s}'.format(str(local_device_uuids))) return local_device_uuids def get_interdomain_device_uuids(context_client : ContextClient) -> Set[str]: context_uuid = DEFAULT_CONTEXT_NAME topology_uuid = INTERDOMAIN_TOPOLOGY_NAME interdomain_topology = get_topology(context_client, topology_uuid, context_uuid=context_uuid) if interdomain_topology is None: MSG = '[get_interdomain_device_uuids] {:s}/{:s} topology not found' LOGGER.warning(MSG.format(context_uuid, topology_uuid)) return set() # add abstracted devices in the interdomain topology interdomain_device_ids = interdomain_topology.device_ids interdomain_device_uuids = {device_id.device_uuid.uuid for device_id in interdomain_device_ids} LOGGER.info('[get_interdomain_device_uuids] interdomain_device_uuids={:s}'.format(str(interdomain_device_uuids))) return interdomain_device_uuids def get_local_domain_devices(context_client : ContextClient) -> List[Device]: local_device_uuids = get_local_device_uuids(context_client) all_devices = context_client.ListDevices(Empty()) local_domain_devices = list() for device in all_devices.devices: if not device_type_is_network(device.device_type): continue device_uuid = device.device_id.device_uuid.uuid if device_uuid not in local_device_uuids: continue local_domain_devices.append(device) return local_domain_devices def is_inter_domain(context_client : ContextClient, endpoint_ids : List[EndPointId]) -> bool: interdomain_device_uuids = get_interdomain_device_uuids(context_client) LOGGER.info('[is_inter_domain] interdomain_device_uuids={:s}'.format(str(interdomain_device_uuids))) non_interdomain_endpoint_ids = [ endpoint_id for endpoint_id in endpoint_ids if endpoint_id.device_id.device_uuid.uuid not in interdomain_device_uuids ] str_non_interdomain_endpoint_ids = [ (endpoint_id.device_id.device_uuid.uuid, endpoint_id.endpoint_uuid.uuid) for endpoint_id in non_interdomain_endpoint_ids ] LOGGER.info('[is_inter_domain] non_interdomain_endpoint_ids={:s}'.format(str(str_non_interdomain_endpoint_ids))) is_inter_domain_ = len(non_interdomain_endpoint_ids) == 0 LOGGER.info('[is_inter_domain] is_inter_domain={:s}'.format(str(is_inter_domain_))) return is_inter_domain_ def is_multi_domain(context_client : ContextClient, endpoint_ids : List[EndPointId]) -> bool: local_device_uuids = get_local_device_uuids(context_client) LOGGER.info('[is_multi_domain] local_device_uuids={:s}'.format(str(local_device_uuids))) remote_endpoint_ids = [ endpoint_id for endpoint_id in endpoint_ids if endpoint_id.device_id.device_uuid.uuid not in local_device_uuids ] str_remote_endpoint_ids = [ (endpoint_id.device_id.device_uuid.uuid, endpoint_id.endpoint_uuid.uuid) for endpoint_id in remote_endpoint_ids ] LOGGER.info('[is_multi_domain] remote_endpoint_ids={:s}'.format(str(str_remote_endpoint_ids))) is_multi_domain_ = len(remote_endpoint_ids) > 0 LOGGER.info('[is_multi_domain] is_multi_domain={:s}'.format(str(is_multi_domain_))) return is_multi_domain_ def compute_interdomain_path( pathcomp_client : PathCompClient, slice_ : Slice ) -> List[Tuple[str, List[EndPointId]]]: context_uuid = slice_.slice_id.context_id.context_uuid.uuid slice_uuid = slice_.slice_id.slice_uuid.uuid pathcomp_req = PathCompRequest() pathcomp_req.shortest_path.Clear() # pylint: disable=no-member pathcomp_req_svc = pathcomp_req.services.add() # pylint: disable=no-member pathcomp_req_svc.service_id.context_id.context_uuid.uuid = context_uuid pathcomp_req_svc.service_id.service_uuid.uuid = slice_uuid pathcomp_req_svc.service_type = ServiceTypeEnum.SERVICETYPE_L2NM for endpoint_id in slice_.slice_endpoint_ids: service_endpoint_id = pathcomp_req_svc.service_endpoint_ids.add() service_endpoint_id.CopyFrom(endpoint_id) constraint_bw = pathcomp_req_svc.service_constraints.add() constraint_bw.custom.constraint_type = 'bandwidth[gbps]' constraint_bw.custom.constraint_value = '10.0' constraint_lat = pathcomp_req_svc.service_constraints.add() constraint_lat.custom.constraint_type = 'latency[ms]' constraint_lat.custom.constraint_value = '100.0' LOGGER.info('pathcomp_req = {:s}'.format(grpc_message_to_json_string(pathcomp_req))) pathcomp_rep = pathcomp_client.Compute(pathcomp_req) LOGGER.info('pathcomp_rep = {:s}'.format(grpc_message_to_json_string(pathcomp_rep))) service = next(iter([ service for service in pathcomp_rep.services if service.service_id == pathcomp_req_svc.service_id ]), None) if service is None: str_service_id = grpc_message_to_json_string(pathcomp_req_svc.service_id) raise Exception('Service({:s}) not found'.format(str_service_id)) connection = next(iter([ connection for connection in pathcomp_rep.connections if connection.service_id == pathcomp_req_svc.service_id ]), None) if connection is None: str_service_id = grpc_message_to_json_string(pathcomp_req_svc.service_id) raise Exception('Connection for Service({:s}) not found'.format(str_service_id)) domain_list : List[str] = list() domain_to_endpoint_ids : Dict[str, List[EndPointId]] = dict() for endpoint_id in connection.path_hops_endpoint_ids: device_uuid = endpoint_id.device_id.device_uuid.uuid #endpoint_uuid = endpoint_id.endpoint_uuid.uuid if device_uuid not in domain_to_endpoint_ids: domain_list.append(device_uuid) domain_to_endpoint_ids.setdefault(device_uuid, []).append(endpoint_id) return [ (domain_uuid, domain_to_endpoint_ids.get(domain_uuid)) for domain_uuid in domain_list ] def get_device_to_domain_map(context_client : ContextClient) -> Dict[str, str]: devices_to_domains : Dict[str, str] = dict() contexts = context_client.ListContexts(Empty()) for context in contexts.contexts: context_id = context.context_id context_uuid = context_id.context_uuid.uuid topologies = context_client.ListTopologies(context_id) if context_uuid == DEFAULT_CONTEXT_NAME: for topology in topologies.topologies: topology_id = topology.topology_id topology_uuid = topology_id.topology_uuid.uuid if topology_uuid in {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME}: continue # add topology names except DEFAULT_TOPOLOGY_NAME and INTERDOMAIN_TOPOLOGY_NAME; they are # abstracted as a local device in inter-domain and the name of the topology is used as # abstract device name devices_to_domains[topology_uuid] = topology_uuid # add physical devices in the local topology for device_id in topology.device_ids: device_uuid = device_id.device_uuid.uuid devices_to_domains[device_uuid] = topology_uuid else: # for each topology in a remote context for topology in topologies.topologies: topology_id = topology.topology_id topology_uuid = topology_id.topology_uuid.uuid # if topology is not interdomain if topology_uuid in {INTERDOMAIN_TOPOLOGY_NAME}: continue # add devices to the remote domain list for device_id in topology.device_ids: device_uuid = device_id.device_uuid.uuid devices_to_domains[device_uuid] = context_uuid return devices_to_domains def compute_traversed_domains( context_client : ContextClient, interdomain_path : List[Tuple[str, List[EndPointId]]] ) -> List[Tuple[str, bool, List[EndPointId]]]: local_device_uuids = get_local_device_uuids(context_client) LOGGER.info('[compute_traversed_domains] local_device_uuids={:s}'.format(str(local_device_uuids))) interdomain_devices = get_devices_in_topology(context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME) interdomain_devices = { device.device_id.device_uuid.uuid : device for device in interdomain_devices } devices_to_domains = get_device_to_domain_map(context_client) LOGGER.info('[compute_traversed_domains] devices_to_domains={:s}'.format(str(devices_to_domains))) traversed_domains : List[Tuple[str, bool, List[EndPointId]]] = list() domains_dict : Dict[str, Tuple[str, bool, List[EndPointId]]] = dict() for device_uuid, endpoint_ids in interdomain_path: domain_uuid = devices_to_domains.get(device_uuid, '---') domain = domains_dict.get(domain_uuid) if domain is None: is_local_domain = domain_uuid in local_device_uuids domain = (domain_uuid, is_local_domain, []) traversed_domains.append(domain) domains_dict[domain_uuid] = domain domain[2].extend(endpoint_ids) str_traversed_domains = [ (domain_uuid, is_local_domain, [ (endpoint_id.device_id.device_uuid.uuid, endpoint_id.endpoint_uuid.uuid) for endpoint_id in endpoint_ids ]) for domain_uuid,is_local_domain,endpoint_ids in traversed_domains ] LOGGER.info('[compute_traversed_domains] devices_to_domains={:s}'.format(str(str_traversed_domains))) return traversed_domains