Loading src/interdomain/service/InterdomainServiceServicerImpl.py +64 −9 Original line number Diff line number Diff line Loading @@ -13,18 +13,23 @@ # limitations under the License. import grpc, logging, uuid from common.proto.context_pb2 import AuthenticationResult, Slice, SliceId, TeraFlowController from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID from common.proto.context_pb2 import AuthenticationResult, Slice, SliceId, SliceStatusEnum, TeraFlowController, TopologyId from common.proto.interdomain_pb2_grpc import InterdomainServiceServicer from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method from common.tools.context_queries.Context import create_context from common.tools.context_queries.InterDomain import ( compute_interdomain_path, compute_traversed_domains, is_multi_domain) compute_interdomain_path, compute_traversed_domains, get_local_device_uuids, is_inter_domain, is_multi_domain) from common.tools.context_queries.Topology import create_topology from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.Topology import json_topology_id from context.client.ContextClient import ContextClient from dlt.connector.client.DltConnectorClient import DltConnectorClient from interdomain.service.topology_abstractor.DltRecordSender import DltRecordSender from pathcomp.frontend.client.PathCompClient import PathCompClient from slice.client.SliceClient import SliceClient from .RemoteDomainClients import RemoteDomainClients from .Tools import compose_slice, compute_slice_owner from .Tools import compose_slice, compute_slice_owner, map_abstract_endpoints_to_real LOGGER = logging.getLogger(__name__) Loading @@ -43,34 +48,84 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer): context_client = ContextClient() pathcomp_client = PathCompClient() slice_client = SliceClient() if not is_multi_domain(context_client, request.slice_endpoint_ids): dlt_connector_client = DltConnectorClient() local_device_uuids = get_local_device_uuids(context_client) slice_owner_uuid = request.slice_owner.owner_uuid.uuid not_inter_domain = not is_inter_domain(context_client, request.slice_endpoint_ids) no_slice_owner = len(slice_owner_uuid) == 0 is_local_slice_owner = slice_owner_uuid in local_device_uuids if not_inter_domain and (no_slice_owner or is_local_slice_owner): str_slice = grpc_message_to_json_string(request) raise Exception('InterDomain can only handle inter-domain slice requests: {:s}'.format(str_slice)) interdomain_path = compute_interdomain_path(pathcomp_client, request) str_interdomain_path = [ [device_uuid, [ (endpoint_id.device_id.device_uuid.uuid, endpoint_id.endpoint_uuid.uuid) for endpoint_id in endpoint_ids ]] for device_uuid, endpoint_ids in interdomain_path ] LOGGER.info('interdomain_path={:s}'.format(str(str_interdomain_path))) traversed_domains = compute_traversed_domains(context_client, interdomain_path) str_traversed_domains = [ (domain_uuid, is_local_domain, [ (endpoint_id.device_id.device_uuid.uuid, endpoint_id.endpoint_uuid.uuid) for endpoint_id in endpoint_ids ]) for domain_uuid,is_local_domain,endpoint_ids in traversed_domains ] LOGGER.info('traversed_domains={:s}'.format(str(str_traversed_domains))) slice_owner_uuid = compute_slice_owner(context_client, traversed_domains) LOGGER.info('slice_owner_uuid={:s}'.format(str(slice_owner_uuid))) if slice_owner_uuid is None: raise Exception('Unable to identify slice owner') reply = Slice() reply.CopyFrom(request) for domain_uuid, _, is_local_domain, endpoint_ids in traversed_domains: dlt_record_sender = DltRecordSender(context_client, dlt_connector_client) for domain_uuid, is_local_domain, endpoint_ids in traversed_domains: slice_uuid = str(uuid.uuid4()) LOGGER.info('[loop] domain_uuid={:s} is_local_domain={:s} slice_uuid={:s}'.format( str(domain_uuid), str(is_local_domain), str(slice_uuid))) if is_local_domain: context_uuid = request.slice_id.context_id.context_uuid.uuid sub_slice = compose_slice(context_uuid, slice_uuid, endpoint_ids) # local slices always in DEFAULT_CONTEXT_UUID #context_uuid = request.slice_id.context_id.context_uuid.uuid context_uuid = DEFAULT_CONTEXT_UUID endpoint_ids = map_abstract_endpoints_to_real(context_client, domain_uuid, endpoint_ids) sub_slice = compose_slice( context_uuid, slice_uuid, endpoint_ids, constraints=request.slice_constraints, config_rules=request.slice_config.config_rules) LOGGER.info('[loop] [local] sub_slice={:s}'.format(grpc_message_to_json_string(sub_slice))) sub_slice_id = slice_client.CreateSlice(sub_slice) else: # create context/topology for the remote domains where we are creating slices create_context(context_client, domain_uuid) sub_slice = compose_slice(domain_uuid, slice_uuid, endpoint_ids, slice_owner_uuid) create_topology(context_client, domain_uuid, DEFAULT_TOPOLOGY_UUID) sub_slice = compose_slice( domain_uuid, slice_uuid, endpoint_ids, constraints=request.slice_constraints, config_rules=request.slice_config.config_rules, owner_uuid=slice_owner_uuid) LOGGER.info('[loop] [remote] sub_slice={:s}'.format(grpc_message_to_json_string(sub_slice))) sub_slice_id = context_client.SetSlice(sub_slice) topology_id = TopologyId(**json_topology_id(domain_uuid)) dlt_record_sender.add_slice(topology_id, sub_slice) LOGGER.info('[loop] adding sub-slice') reply.slice_subslice_ids.add().CopyFrom(sub_slice_id) # pylint: disable=no-member LOGGER.info('Recording Remote Slice requests to DLT') dlt_record_sender.commit() LOGGER.info('Activating interdomain slice') reply.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_ACTIVE # pylint: disable=no-member LOGGER.info('Updating interdomain slice') slice_id = context_client.SetSlice(reply) return slice_id Loading src/interdomain/service/Tools.py +77 −10 Original line number Diff line number Diff line Loading @@ -15,9 +15,13 @@ import json, logging from typing import List, Optional, Tuple from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, INTERDOMAIN_TOPOLOGY_UUID from common.proto.context_pb2 import ContextId, Device, EndPointId, Slice, SliceStatusEnum from common.tools.context_queries.InterDomain import get_local_domain_devices from common.tools.grpc.Tools import grpc_message_to_json_string from common.proto.context_pb2 import ( ConfigRule, Constraint, ContextId, Device, Empty, EndPointId, Slice, SliceStatusEnum) from common.tools.context_queries.CheckType import device_type_is_network, endpoint_type_is_border from common.tools.context_queries.InterDomain import get_local_device_uuids from common.tools.grpc.ConfigRules import copy_config_rules from common.tools.grpc.Constraints import copy_constraints from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string from common.tools.object_factory.Context import json_context_id from context.client.ContextClient import ContextClient Loading Loading @@ -48,7 +52,8 @@ def compute_slice_owner( return candidate_owner_uuids.pop() def compose_slice( context_uuid : str, slice_uuid : str, endpoint_ids : List[EndPointId], owner_uuid : Optional[str] = None context_uuid : str, slice_uuid : str, endpoint_ids : List[EndPointId], constraints : List[Constraint] = [], config_rules : List[ConfigRule] = [], owner_uuid : Optional[str] = None ) -> Slice: slice_ = Slice() slice_.slice_id.context_id.context_uuid.uuid = context_uuid # pylint: disable=no-member Loading @@ -58,7 +63,69 @@ def compose_slice( if owner_uuid is not None: slice_.slice_owner.owner_uuid.uuid = owner_uuid # pylint: disable=no-member for endpoint_id in endpoint_ids: slice_.slice_endpoint_ids.append(endpoint_id) # pylint: disable=no-member if len(endpoint_ids) >= 2: slice_.slice_endpoint_ids.add().CopyFrom(endpoint_ids[0]) # pylint: disable=no-member slice_.slice_endpoint_ids.add().CopyFrom(endpoint_ids[-1]) # pylint: disable=no-member if len(constraints) > 0: copy_constraints(constraints, slice_.slice_constraints) # pylint: disable=no-member if len(config_rules) > 0: copy_config_rules(config_rules, slice_.slice_config.config_rules) # pylint: disable=no-member return slice_ def map_abstract_endpoints_to_real( context_client : ContextClient, local_domain_uuid : str, abstract_endpoint_ids : List[EndPointId] ) -> List[EndPointId]: local_device_uuids = get_local_device_uuids(context_client) all_devices = context_client.ListDevices(Empty()) map_endpoints_to_devices = dict() for device in all_devices.devices: LOGGER.info('[map_abstract_endpoints_to_real] Checking device {:s}'.format( grpc_message_to_json_string(device))) if device_type_is_network(device.device_type): LOGGER.info('[map_abstract_endpoints_to_real] Ignoring network device') continue device_uuid = device.device_id.device_uuid.uuid if device_uuid not in local_device_uuids: LOGGER.info('[map_abstract_endpoints_to_real] Ignoring non-local device') continue for endpoint in device.device_endpoints: LOGGER.info('[map_abstract_endpoints_to_real] Checking endpoint {:s}'.format( grpc_message_to_json_string(endpoint))) endpoint_id = endpoint.endpoint_id device_uuid = endpoint_id.device_id.device_uuid.uuid endpoint_uuid = endpoint_id.endpoint_uuid.uuid map_endpoints_to_devices[(device_uuid, endpoint_uuid)] = endpoint_id if endpoint_type_is_border(endpoint.endpoint_type): map_endpoints_to_devices[(local_domain_uuid, endpoint_uuid)] = endpoint_id LOGGER.info('[map_abstract_endpoints_to_real] map_endpoints_to_devices={:s}'.format( str({ endpoint_tuple:grpc_message_to_json(endpoint_id) for endpoint_tuple,endpoint_id in map_endpoints_to_devices.items() }))) # map abstract device/endpoints to real device/endpoints real_endpoint_ids = [] for endpoint_id in abstract_endpoint_ids: LOGGER.info('[map_abstract_endpoints_to_real] Mapping endpoint_id {:s} ...'.format( grpc_message_to_json_string(endpoint_id))) device_uuid = endpoint_id.device_id.device_uuid.uuid endpoint_uuid = endpoint_id.endpoint_uuid.uuid _endpoint_id = map_endpoints_to_devices.get((device_uuid, endpoint_uuid)) if _endpoint_id is None: LOGGER.warning('map_endpoints_to_devices={:s}'.format(str(map_endpoints_to_devices))) MSG = 'Unable to map abstract EndPoint({:s}) to real one.' raise Exception(MSG.format(grpc_message_to_json_string(endpoint_id))) LOGGER.info('[map_abstract_endpoints_to_real] ... to endpoint_id {:s}'.format( grpc_message_to_json_string(_endpoint_id))) real_endpoint_ids.append(_endpoint_id) return real_endpoint_ids src/interdomain/service/__main__.py +2 −2 Original line number Diff line number Diff line Loading @@ -17,7 +17,7 @@ from prometheus_client import start_http_server from common.Constants import ServiceNameEnum from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, get_service_port_grpc, wait_for_environment_variables) wait_for_environment_variables) from .topology_abstractor.TopologyAbstractor import TopologyAbstractor from .InterdomainService import InterdomainService from .RemoteDomainClients import RemoteDomainClients Loading @@ -33,7 +33,7 @@ def main(): global LOGGER # pylint: disable=global-statement log_level = get_log_level() logging.basicConfig(level=log_level) logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s") LOGGER = logging.getLogger(__name__) wait_for_environment_variables([ Loading Loading
src/interdomain/service/InterdomainServiceServicerImpl.py +64 −9 Original line number Diff line number Diff line Loading @@ -13,18 +13,23 @@ # limitations under the License. import grpc, logging, uuid from common.proto.context_pb2 import AuthenticationResult, Slice, SliceId, TeraFlowController from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID from common.proto.context_pb2 import AuthenticationResult, Slice, SliceId, SliceStatusEnum, TeraFlowController, TopologyId from common.proto.interdomain_pb2_grpc import InterdomainServiceServicer from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method from common.tools.context_queries.Context import create_context from common.tools.context_queries.InterDomain import ( compute_interdomain_path, compute_traversed_domains, is_multi_domain) compute_interdomain_path, compute_traversed_domains, get_local_device_uuids, is_inter_domain, is_multi_domain) from common.tools.context_queries.Topology import create_topology from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.Topology import json_topology_id from context.client.ContextClient import ContextClient from dlt.connector.client.DltConnectorClient import DltConnectorClient from interdomain.service.topology_abstractor.DltRecordSender import DltRecordSender from pathcomp.frontend.client.PathCompClient import PathCompClient from slice.client.SliceClient import SliceClient from .RemoteDomainClients import RemoteDomainClients from .Tools import compose_slice, compute_slice_owner from .Tools import compose_slice, compute_slice_owner, map_abstract_endpoints_to_real LOGGER = logging.getLogger(__name__) Loading @@ -43,34 +48,84 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer): context_client = ContextClient() pathcomp_client = PathCompClient() slice_client = SliceClient() if not is_multi_domain(context_client, request.slice_endpoint_ids): dlt_connector_client = DltConnectorClient() local_device_uuids = get_local_device_uuids(context_client) slice_owner_uuid = request.slice_owner.owner_uuid.uuid not_inter_domain = not is_inter_domain(context_client, request.slice_endpoint_ids) no_slice_owner = len(slice_owner_uuid) == 0 is_local_slice_owner = slice_owner_uuid in local_device_uuids if not_inter_domain and (no_slice_owner or is_local_slice_owner): str_slice = grpc_message_to_json_string(request) raise Exception('InterDomain can only handle inter-domain slice requests: {:s}'.format(str_slice)) interdomain_path = compute_interdomain_path(pathcomp_client, request) str_interdomain_path = [ [device_uuid, [ (endpoint_id.device_id.device_uuid.uuid, endpoint_id.endpoint_uuid.uuid) for endpoint_id in endpoint_ids ]] for device_uuid, endpoint_ids in interdomain_path ] LOGGER.info('interdomain_path={:s}'.format(str(str_interdomain_path))) traversed_domains = compute_traversed_domains(context_client, interdomain_path) str_traversed_domains = [ (domain_uuid, is_local_domain, [ (endpoint_id.device_id.device_uuid.uuid, endpoint_id.endpoint_uuid.uuid) for endpoint_id in endpoint_ids ]) for domain_uuid,is_local_domain,endpoint_ids in traversed_domains ] LOGGER.info('traversed_domains={:s}'.format(str(str_traversed_domains))) slice_owner_uuid = compute_slice_owner(context_client, traversed_domains) LOGGER.info('slice_owner_uuid={:s}'.format(str(slice_owner_uuid))) if slice_owner_uuid is None: raise Exception('Unable to identify slice owner') reply = Slice() reply.CopyFrom(request) for domain_uuid, _, is_local_domain, endpoint_ids in traversed_domains: dlt_record_sender = DltRecordSender(context_client, dlt_connector_client) for domain_uuid, is_local_domain, endpoint_ids in traversed_domains: slice_uuid = str(uuid.uuid4()) LOGGER.info('[loop] domain_uuid={:s} is_local_domain={:s} slice_uuid={:s}'.format( str(domain_uuid), str(is_local_domain), str(slice_uuid))) if is_local_domain: context_uuid = request.slice_id.context_id.context_uuid.uuid sub_slice = compose_slice(context_uuid, slice_uuid, endpoint_ids) # local slices always in DEFAULT_CONTEXT_UUID #context_uuid = request.slice_id.context_id.context_uuid.uuid context_uuid = DEFAULT_CONTEXT_UUID endpoint_ids = map_abstract_endpoints_to_real(context_client, domain_uuid, endpoint_ids) sub_slice = compose_slice( context_uuid, slice_uuid, endpoint_ids, constraints=request.slice_constraints, config_rules=request.slice_config.config_rules) LOGGER.info('[loop] [local] sub_slice={:s}'.format(grpc_message_to_json_string(sub_slice))) sub_slice_id = slice_client.CreateSlice(sub_slice) else: # create context/topology for the remote domains where we are creating slices create_context(context_client, domain_uuid) sub_slice = compose_slice(domain_uuid, slice_uuid, endpoint_ids, slice_owner_uuid) create_topology(context_client, domain_uuid, DEFAULT_TOPOLOGY_UUID) sub_slice = compose_slice( domain_uuid, slice_uuid, endpoint_ids, constraints=request.slice_constraints, config_rules=request.slice_config.config_rules, owner_uuid=slice_owner_uuid) LOGGER.info('[loop] [remote] sub_slice={:s}'.format(grpc_message_to_json_string(sub_slice))) sub_slice_id = context_client.SetSlice(sub_slice) topology_id = TopologyId(**json_topology_id(domain_uuid)) dlt_record_sender.add_slice(topology_id, sub_slice) LOGGER.info('[loop] adding sub-slice') reply.slice_subslice_ids.add().CopyFrom(sub_slice_id) # pylint: disable=no-member LOGGER.info('Recording Remote Slice requests to DLT') dlt_record_sender.commit() LOGGER.info('Activating interdomain slice') reply.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_ACTIVE # pylint: disable=no-member LOGGER.info('Updating interdomain slice') slice_id = context_client.SetSlice(reply) return slice_id Loading
src/interdomain/service/Tools.py +77 −10 Original line number Diff line number Diff line Loading @@ -15,9 +15,13 @@ import json, logging from typing import List, Optional, Tuple from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, INTERDOMAIN_TOPOLOGY_UUID from common.proto.context_pb2 import ContextId, Device, EndPointId, Slice, SliceStatusEnum from common.tools.context_queries.InterDomain import get_local_domain_devices from common.tools.grpc.Tools import grpc_message_to_json_string from common.proto.context_pb2 import ( ConfigRule, Constraint, ContextId, Device, Empty, EndPointId, Slice, SliceStatusEnum) from common.tools.context_queries.CheckType import device_type_is_network, endpoint_type_is_border from common.tools.context_queries.InterDomain import get_local_device_uuids from common.tools.grpc.ConfigRules import copy_config_rules from common.tools.grpc.Constraints import copy_constraints from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string from common.tools.object_factory.Context import json_context_id from context.client.ContextClient import ContextClient Loading Loading @@ -48,7 +52,8 @@ def compute_slice_owner( return candidate_owner_uuids.pop() def compose_slice( context_uuid : str, slice_uuid : str, endpoint_ids : List[EndPointId], owner_uuid : Optional[str] = None context_uuid : str, slice_uuid : str, endpoint_ids : List[EndPointId], constraints : List[Constraint] = [], config_rules : List[ConfigRule] = [], owner_uuid : Optional[str] = None ) -> Slice: slice_ = Slice() slice_.slice_id.context_id.context_uuid.uuid = context_uuid # pylint: disable=no-member Loading @@ -58,7 +63,69 @@ def compose_slice( if owner_uuid is not None: slice_.slice_owner.owner_uuid.uuid = owner_uuid # pylint: disable=no-member for endpoint_id in endpoint_ids: slice_.slice_endpoint_ids.append(endpoint_id) # pylint: disable=no-member if len(endpoint_ids) >= 2: slice_.slice_endpoint_ids.add().CopyFrom(endpoint_ids[0]) # pylint: disable=no-member slice_.slice_endpoint_ids.add().CopyFrom(endpoint_ids[-1]) # pylint: disable=no-member if len(constraints) > 0: copy_constraints(constraints, slice_.slice_constraints) # pylint: disable=no-member if len(config_rules) > 0: copy_config_rules(config_rules, slice_.slice_config.config_rules) # pylint: disable=no-member return slice_ def map_abstract_endpoints_to_real( context_client : ContextClient, local_domain_uuid : str, abstract_endpoint_ids : List[EndPointId] ) -> List[EndPointId]: local_device_uuids = get_local_device_uuids(context_client) all_devices = context_client.ListDevices(Empty()) map_endpoints_to_devices = dict() for device in all_devices.devices: LOGGER.info('[map_abstract_endpoints_to_real] Checking device {:s}'.format( grpc_message_to_json_string(device))) if device_type_is_network(device.device_type): LOGGER.info('[map_abstract_endpoints_to_real] Ignoring network device') continue device_uuid = device.device_id.device_uuid.uuid if device_uuid not in local_device_uuids: LOGGER.info('[map_abstract_endpoints_to_real] Ignoring non-local device') continue for endpoint in device.device_endpoints: LOGGER.info('[map_abstract_endpoints_to_real] Checking endpoint {:s}'.format( grpc_message_to_json_string(endpoint))) endpoint_id = endpoint.endpoint_id device_uuid = endpoint_id.device_id.device_uuid.uuid endpoint_uuid = endpoint_id.endpoint_uuid.uuid map_endpoints_to_devices[(device_uuid, endpoint_uuid)] = endpoint_id if endpoint_type_is_border(endpoint.endpoint_type): map_endpoints_to_devices[(local_domain_uuid, endpoint_uuid)] = endpoint_id LOGGER.info('[map_abstract_endpoints_to_real] map_endpoints_to_devices={:s}'.format( str({ endpoint_tuple:grpc_message_to_json(endpoint_id) for endpoint_tuple,endpoint_id in map_endpoints_to_devices.items() }))) # map abstract device/endpoints to real device/endpoints real_endpoint_ids = [] for endpoint_id in abstract_endpoint_ids: LOGGER.info('[map_abstract_endpoints_to_real] Mapping endpoint_id {:s} ...'.format( grpc_message_to_json_string(endpoint_id))) device_uuid = endpoint_id.device_id.device_uuid.uuid endpoint_uuid = endpoint_id.endpoint_uuid.uuid _endpoint_id = map_endpoints_to_devices.get((device_uuid, endpoint_uuid)) if _endpoint_id is None: LOGGER.warning('map_endpoints_to_devices={:s}'.format(str(map_endpoints_to_devices))) MSG = 'Unable to map abstract EndPoint({:s}) to real one.' raise Exception(MSG.format(grpc_message_to_json_string(endpoint_id))) LOGGER.info('[map_abstract_endpoints_to_real] ... to endpoint_id {:s}'.format( grpc_message_to_json_string(_endpoint_id))) real_endpoint_ids.append(_endpoint_id) return real_endpoint_ids
src/interdomain/service/__main__.py +2 −2 Original line number Diff line number Diff line Loading @@ -17,7 +17,7 @@ from prometheus_client import start_http_server from common.Constants import ServiceNameEnum from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, get_service_port_grpc, wait_for_environment_variables) wait_for_environment_variables) from .topology_abstractor.TopologyAbstractor import TopologyAbstractor from .InterdomainService import InterdomainService from .RemoteDomainClients import RemoteDomainClients Loading @@ -33,7 +33,7 @@ def main(): global LOGGER # pylint: disable=global-statement log_level = get_log_level() logging.basicConfig(level=log_level) logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s") LOGGER = logging.getLogger(__name__) wait_for_environment_variables([ Loading