From 608093ed299affef3fdd5e018234d66c12cc0d14 Mon Sep 17 00:00:00 2001 From: gifrerenom <lluis.gifre@cttc.es> Date: Tue, 8 Nov 2022 07:54:56 +0000 Subject: [PATCH] Interdomain Component: - backed up old servicer - updated to use new generic Context queries - added pathcomp as dependency - updated logic to validate inter-domain requests - implemented inter-domain pathcomp request - updated logic to create per-domain slices --- src/interdomain/Dockerfile | 1 + .../service/InterdomainServiceServicerImpl.py | 115 ++++--------- src/interdomain/service/Tools.py | 64 ++++++++ src/interdomain/service/__main__.py | 14 +- .../InterdomainServiceServicerImpl.py | 153 ++++++++++++++++++ .../topology_abstractor/AbstractDevice.py | 6 +- .../topology_abstractor/AbstractLink.py | 2 +- .../service/topology_abstractor/Tools.py | 147 ----------------- .../topology_abstractor/TopologyAbstractor.py | 9 +- 9 files changed, 269 insertions(+), 242 deletions(-) create mode 100644 src/interdomain/service/Tools.py create mode 100644 src/interdomain/service/_old_code/InterdomainServiceServicerImpl.py delete mode 100644 src/interdomain/service/topology_abstractor/Tools.py diff --git a/src/interdomain/Dockerfile b/src/interdomain/Dockerfile index 036890dc4..ee1071896 100644 --- a/src/interdomain/Dockerfile +++ b/src/interdomain/Dockerfile @@ -67,6 +67,7 @@ COPY src/context/. context/ COPY src/dlt/. dlt/ COPY src/interdomain/. interdomain/ #COPY src/monitoring/. monitoring/ +COPY src/pathcomp/. pathcomp/ #COPY src/service/. service/ COPY src/slice/. slice/ diff --git a/src/interdomain/service/InterdomainServiceServicerImpl.py b/src/interdomain/service/InterdomainServiceServicerImpl.py index 01ba90ef5..604089292 100644 --- a/src/interdomain/service/InterdomainServiceServicerImpl.py +++ b/src/interdomain/service/InterdomainServiceServicerImpl.py @@ -12,15 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging -from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method -from common.proto.context_pb2 import ( - AuthenticationResult, Slice, SliceId, SliceStatus, SliceStatusEnum, TeraFlowController) +import grpc, logging, uuid +from common.proto.context_pb2 import AuthenticationResult, Slice, SliceId, TeraFlowController from common.proto.interdomain_pb2_grpc import InterdomainServiceServicer -#from common.tools.grpc.Tools import grpc_message_to_json_string +from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method +from common.tools.context_queries.InterDomain import ( + compute_interdomain_path, compute_traversed_domains, is_multi_domain) +from common.tools.grpc.Tools import grpc_message_to_json_string from context.client.ContextClient import ContextClient -from interdomain.service.RemoteDomainClients import RemoteDomainClients +from pathcomp.frontend.client.PathCompClient import PathCompClient from slice.client.SliceClient import SliceClient +from .RemoteDomainClients import RemoteDomainClients +from .Tools import compose_slice, compute_slice_owner LOGGER = logging.getLogger(__name__) @@ -37,89 +40,37 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer): @safe_and_metered_rpc_method(METRICS, LOGGER) def RequestSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: context_client = ContextClient() + pathcomp_client = PathCompClient() slice_client = SliceClient() - domains_to_endpoints = {} - local_domain_uuid = None - for slice_endpoint_id in request.slice_endpoint_ids: - device_uuid = slice_endpoint_id.device_id.device_uuid.uuid - domain_uuid = device_uuid.split('@')[1] - endpoints = domains_to_endpoints.setdefault(domain_uuid, []) - endpoints.append(slice_endpoint_id) - if local_domain_uuid is None: local_domain_uuid = domain_uuid + if not is_multi_domain(context_client, request.slice_endpoint_ids): + str_slice = grpc_message_to_json_string(request) + raise Exception('InterDomain can only handle inter-domain slice requests: {:s}'.format(str_slice)) + + interdomain_path = compute_interdomain_path(pathcomp_client, request) + traversed_domains = compute_traversed_domains(context_client, interdomain_path) + slice_owner_uuid = compute_slice_owner(context_client, traversed_domains) + if slice_owner_uuid is None: + raise Exception('Unable to identify slice owner') reply = Slice() reply.CopyFrom(request) - # decompose remote slices - for domain_uuid, slice_endpoint_ids in domains_to_endpoints.items(): - if domain_uuid == local_domain_uuid: continue - - remote_slice_request = Slice() - remote_slice_request.slice_id.context_id.context_uuid.uuid = request.slice_id.context_id.context_uuid.uuid - remote_slice_request.slice_id.slice_uuid.uuid = \ - request.slice_id.slice_uuid.uuid + ':subslice@' + local_domain_uuid - remote_slice_request.slice_status.slice_status = request.slice_status.slice_status - for endpoint_id in slice_endpoint_ids: - slice_endpoint_id = remote_slice_request.slice_endpoint_ids.add() - slice_endpoint_id.device_id.device_uuid.uuid = endpoint_id.device_id.device_uuid.uuid - slice_endpoint_id.endpoint_uuid.uuid = endpoint_id.endpoint_uuid.uuid - - # add endpoint connecting to remote domain - if domain_uuid == 'D1': - slice_endpoint_id = remote_slice_request.slice_endpoint_ids.add() - slice_endpoint_id.device_id.device_uuid.uuid = 'R4@D1' - slice_endpoint_id.endpoint_uuid.uuid = '2/1' - elif domain_uuid == 'D2': - slice_endpoint_id = remote_slice_request.slice_endpoint_ids.add() - slice_endpoint_id.device_id.device_uuid.uuid = 'R1@D2' - slice_endpoint_id.endpoint_uuid.uuid = '2/1' - - interdomain_client = self.remote_domain_clients.get_peer('remote-teraflow') - remote_slice_reply = interdomain_client.LookUpSlice(remote_slice_request) - if remote_slice_reply == remote_slice_request.slice_id: # pylint: disable=no-member - # successful case - remote_slice = interdomain_client.OrderSliceFromCatalog(remote_slice_request) - if remote_slice.slice_status.slice_status != SliceStatusEnum.SLICESTATUS_ACTIVE: - raise Exception('Remote Slice creation failed. Wrong Slice status returned') + for domain_uuid, _, is_local_domain, endpoint_ids in traversed_domains: + slice_uuid = str(uuid.uuid4()) + + if is_local_domain: + context_uuid = request.slice_id.context_id.context_uuid.uuid + sub_slice = compose_slice(context_uuid, slice_uuid, endpoint_ids) + sub_slice_id = slice_client.CreateSlice(sub_slice) else: - # not in catalog - remote_slice = interdomain_client.CreateSliceAndAddToCatalog(remote_slice_request) - if remote_slice.slice_status.slice_status != SliceStatusEnum.SLICESTATUS_ACTIVE: - raise Exception('Remote Slice creation failed. Wrong Slice status returned') - - #context_client.SetSlice(remote_slice) - #subslice_id = reply.slice_subslice_ids.add() - #subslice_id.CopyFrom(remote_slice.slice_id) - - local_slice_request = Slice() - local_slice_request.slice_id.context_id.context_uuid.uuid = request.slice_id.context_id.context_uuid.uuid - local_slice_request.slice_id.slice_uuid.uuid = request.slice_id.slice_uuid.uuid + ':subslice' - local_slice_request.slice_status.slice_status = request.slice_status.slice_status - for endpoint_id in domains_to_endpoints[local_domain_uuid]: - slice_endpoint_id = local_slice_request.slice_endpoint_ids.add() - slice_endpoint_id.CopyFrom(endpoint_id) - - # add endpoint connecting to remote domain - if local_domain_uuid == 'D1': - slice_endpoint_id = local_slice_request.slice_endpoint_ids.add() - slice_endpoint_id.device_id.device_uuid.uuid = 'R4@D1' - slice_endpoint_id.endpoint_uuid.uuid = '2/1' - elif local_domain_uuid == 'D2': - slice_endpoint_id = local_slice_request.slice_endpoint_ids.add() - slice_endpoint_id.device_id.device_uuid.uuid = 'R1@D2' - slice_endpoint_id.endpoint_uuid.uuid = '2/1' - - local_slice_reply = slice_client.CreateSlice(local_slice_request) - if local_slice_reply != local_slice_request.slice_id: # pylint: disable=no-member - raise Exception('Local Slice creation failed. Wrong Slice Id was returned') - - subslice_id = reply.slice_subslice_ids.add() - subslice_id.context_id.context_uuid.uuid = local_slice_request.slice_id.context_id.context_uuid.uuid - subslice_id.slice_uuid.uuid = local_slice_request.slice_id.slice_uuid.uuid - - context_client.SetSlice(reply) - return reply.slice_id + sub_slice = compose_slice(domain_uuid, slice_uuid, endpoint_ids, slice_owner_uuid) + sub_slice_id = context_client.SetSlice(sub_slice) + + reply.slice_subslice_ids.add().CopyFrom(sub_slice_id) # pylint: disable=no-member + + slice_id = context_client.SetSlice(reply) + return slice_id @safe_and_metered_rpc_method(METRICS, LOGGER) def Authenticate(self, request : TeraFlowController, context : grpc.ServicerContext) -> AuthenticationResult: diff --git a/src/interdomain/service/Tools.py b/src/interdomain/service/Tools.py new file mode 100644 index 000000000..8a770364c --- /dev/null +++ b/src/interdomain/service/Tools.py @@ -0,0 +1,64 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json, logging +from typing import List, Optional, Tuple +from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, INTERDOMAIN_TOPOLOGY_UUID +from common.proto.context_pb2 import ContextId, Device, EndPointId, Slice, SliceStatusEnum +from common.tools.context_queries.InterDomain import get_local_domain_devices +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.tools.object_factory.Context import json_context_id +from context.client.ContextClient import ContextClient + +LOGGER = logging.getLogger(__name__) + +def compute_slice_owner( + context_client : ContextClient, traversed_domains : List[Tuple[str, Device, bool, List[EndPointId]]] +) -> Optional[str]: + traversed_domain_uuids = {traversed_domain[0] for traversed_domain in traversed_domains} + + existing_topology_ids = context_client.ListTopologyIds(ContextId(**json_context_id(DEFAULT_CONTEXT_UUID))) + existing_topology_uuids = { + topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids + } + existing_topology_uuids.discard(DEFAULT_TOPOLOGY_UUID) + existing_topology_uuids.discard(INTERDOMAIN_TOPOLOGY_UUID) + + candidate_owner_uuids = traversed_domain_uuids.intersection(existing_topology_uuids) + if len(candidate_owner_uuids) != 1: + data = { + 'traversed_domain_uuids' : [td_uuid for td_uuid in traversed_domain_uuids ], + 'existing_topology_uuids': [et_uuid for et_uuid in existing_topology_uuids], + 'candidate_owner_uuids' : [co_uuid for co_uuid in candidate_owner_uuids ], + } + LOGGER.warning('Unable to identify slice owner: {:s}'.format(json.dumps(data))) + return None + + return candidate_owner_uuids.pop() + +def compose_slice( + context_uuid : str, slice_uuid : str, endpoint_ids : List[EndPointId], owner_uuid : Optional[str] = None +) -> Slice: + slice_ = Slice() + slice_.slice_id.context_id.context_uuid.uuid = context_uuid # pylint: disable=no-member + slice_.slice_id.slice_uuid.uuid = slice_uuid # pylint: disable=no-member + slice_.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_PLANNED # pylint: disable=no-member + + if owner_uuid is not None: + slice_.slice_owner.owner_uuid.uuid = owner_uuid # pylint: disable=no-member + + for endpoint_id in endpoint_ids: + slice_.slice_endpoint_ids.append(endpoint_id) # pylint: disable=no-member + + return slice_ diff --git a/src/interdomain/service/__main__.py b/src/interdomain/service/__main__.py index b5463d7d8..ebfade0d7 100644 --- a/src/interdomain/service/__main__.py +++ b/src/interdomain/service/__main__.py @@ -37,12 +37,14 @@ def main(): LOGGER = logging.getLogger(__name__) wait_for_environment_variables([ - get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ), - get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), - get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_HOST ), - get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC), - get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST ), - get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + get_env_var_name(ServiceNameEnum.PATHCOMP, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.PATHCOMP, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), ]) signal.signal(signal.SIGINT, signal_handler) diff --git a/src/interdomain/service/_old_code/InterdomainServiceServicerImpl.py b/src/interdomain/service/_old_code/InterdomainServiceServicerImpl.py new file mode 100644 index 000000000..01ba90ef5 --- /dev/null +++ b/src/interdomain/service/_old_code/InterdomainServiceServicerImpl.py @@ -0,0 +1,153 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc, logging +from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method +from common.proto.context_pb2 import ( + AuthenticationResult, Slice, SliceId, SliceStatus, SliceStatusEnum, TeraFlowController) +from common.proto.interdomain_pb2_grpc import InterdomainServiceServicer +#from common.tools.grpc.Tools import grpc_message_to_json_string +from context.client.ContextClient import ContextClient +from interdomain.service.RemoteDomainClients import RemoteDomainClients +from slice.client.SliceClient import SliceClient + +LOGGER = logging.getLogger(__name__) + +SERVICE_NAME = 'Interdomain' +METHOD_NAMES = ['RequestSlice', 'Authenticate', 'LookUpSlice', 'OrderSliceFromCatalog', 'CreateSliceAndAddToCatalog'] +METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) + +class InterdomainServiceServicerImpl(InterdomainServiceServicer): + def __init__(self, remote_domain_clients : RemoteDomainClients): + LOGGER.debug('Creating Servicer...') + self.remote_domain_clients = remote_domain_clients + LOGGER.debug('Servicer Created') + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def RequestSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: + context_client = ContextClient() + slice_client = SliceClient() + + domains_to_endpoints = {} + local_domain_uuid = None + for slice_endpoint_id in request.slice_endpoint_ids: + device_uuid = slice_endpoint_id.device_id.device_uuid.uuid + domain_uuid = device_uuid.split('@')[1] + endpoints = domains_to_endpoints.setdefault(domain_uuid, []) + endpoints.append(slice_endpoint_id) + if local_domain_uuid is None: local_domain_uuid = domain_uuid + + reply = Slice() + reply.CopyFrom(request) + + # decompose remote slices + for domain_uuid, slice_endpoint_ids in domains_to_endpoints.items(): + if domain_uuid == local_domain_uuid: continue + + remote_slice_request = Slice() + remote_slice_request.slice_id.context_id.context_uuid.uuid = request.slice_id.context_id.context_uuid.uuid + remote_slice_request.slice_id.slice_uuid.uuid = \ + request.slice_id.slice_uuid.uuid + ':subslice@' + local_domain_uuid + remote_slice_request.slice_status.slice_status = request.slice_status.slice_status + for endpoint_id in slice_endpoint_ids: + slice_endpoint_id = remote_slice_request.slice_endpoint_ids.add() + slice_endpoint_id.device_id.device_uuid.uuid = endpoint_id.device_id.device_uuid.uuid + slice_endpoint_id.endpoint_uuid.uuid = endpoint_id.endpoint_uuid.uuid + + # add endpoint connecting to remote domain + if domain_uuid == 'D1': + slice_endpoint_id = remote_slice_request.slice_endpoint_ids.add() + slice_endpoint_id.device_id.device_uuid.uuid = 'R4@D1' + slice_endpoint_id.endpoint_uuid.uuid = '2/1' + elif domain_uuid == 'D2': + slice_endpoint_id = remote_slice_request.slice_endpoint_ids.add() + slice_endpoint_id.device_id.device_uuid.uuid = 'R1@D2' + slice_endpoint_id.endpoint_uuid.uuid = '2/1' + + interdomain_client = self.remote_domain_clients.get_peer('remote-teraflow') + remote_slice_reply = interdomain_client.LookUpSlice(remote_slice_request) + if remote_slice_reply == remote_slice_request.slice_id: # pylint: disable=no-member + # successful case + remote_slice = interdomain_client.OrderSliceFromCatalog(remote_slice_request) + if remote_slice.slice_status.slice_status != SliceStatusEnum.SLICESTATUS_ACTIVE: + raise Exception('Remote Slice creation failed. Wrong Slice status returned') + else: + # not in catalog + remote_slice = interdomain_client.CreateSliceAndAddToCatalog(remote_slice_request) + if remote_slice.slice_status.slice_status != SliceStatusEnum.SLICESTATUS_ACTIVE: + raise Exception('Remote Slice creation failed. Wrong Slice status returned') + + #context_client.SetSlice(remote_slice) + #subslice_id = reply.slice_subslice_ids.add() + #subslice_id.CopyFrom(remote_slice.slice_id) + + local_slice_request = Slice() + local_slice_request.slice_id.context_id.context_uuid.uuid = request.slice_id.context_id.context_uuid.uuid + local_slice_request.slice_id.slice_uuid.uuid = request.slice_id.slice_uuid.uuid + ':subslice' + local_slice_request.slice_status.slice_status = request.slice_status.slice_status + for endpoint_id in domains_to_endpoints[local_domain_uuid]: + slice_endpoint_id = local_slice_request.slice_endpoint_ids.add() + slice_endpoint_id.CopyFrom(endpoint_id) + + # add endpoint connecting to remote domain + if local_domain_uuid == 'D1': + slice_endpoint_id = local_slice_request.slice_endpoint_ids.add() + slice_endpoint_id.device_id.device_uuid.uuid = 'R4@D1' + slice_endpoint_id.endpoint_uuid.uuid = '2/1' + elif local_domain_uuid == 'D2': + slice_endpoint_id = local_slice_request.slice_endpoint_ids.add() + slice_endpoint_id.device_id.device_uuid.uuid = 'R1@D2' + slice_endpoint_id.endpoint_uuid.uuid = '2/1' + + local_slice_reply = slice_client.CreateSlice(local_slice_request) + if local_slice_reply != local_slice_request.slice_id: # pylint: disable=no-member + raise Exception('Local Slice creation failed. Wrong Slice Id was returned') + + subslice_id = reply.slice_subslice_ids.add() + subslice_id.context_id.context_uuid.uuid = local_slice_request.slice_id.context_id.context_uuid.uuid + subslice_id.slice_uuid.uuid = local_slice_request.slice_id.slice_uuid.uuid + + context_client.SetSlice(reply) + return reply.slice_id + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def Authenticate(self, request : TeraFlowController, context : grpc.ServicerContext) -> AuthenticationResult: + auth_result = AuthenticationResult() + auth_result.context_id.CopyFrom(request.context_id) # pylint: disable=no-member + auth_result.authenticated = True + return auth_result + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def LookUpSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: + try: + context_client = ContextClient() + slice_ = context_client.GetSlice(request.slice_id) + return slice_.slice_id + except grpc.RpcError: + #LOGGER.exception('Unable to get slice({:s})'.format(grpc_message_to_json_string(request.slice_id))) + return SliceId() + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def OrderSliceFromCatalog(self, request : Slice, context : grpc.ServicerContext) -> Slice: + raise NotImplementedError('OrderSliceFromCatalog') + #return Slice() + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def CreateSliceAndAddToCatalog(self, request : Slice, context : grpc.ServicerContext) -> Slice: + context_client = ContextClient() + slice_client = SliceClient() + reply = slice_client.CreateSlice(request) + if reply != request.slice_id: # pylint: disable=no-member + raise Exception('Slice creation failed. Wrong Slice Id was returned') + return context_client.GetSlice(request.slice_id) diff --git a/src/interdomain/service/topology_abstractor/AbstractDevice.py b/src/interdomain/service/topology_abstractor/AbstractDevice.py index 443fc18cd..3448c1036 100644 --- a/src/interdomain/service/topology_abstractor/AbstractDevice.py +++ b/src/interdomain/service/topology_abstractor/AbstractDevice.py @@ -18,12 +18,12 @@ from common.Constants import DEFAULT_CONTEXT_UUID, INTERDOMAIN_TOPOLOGY_UUID from common.DeviceTypes import DeviceTypeEnum from common.proto.context_pb2 import ( ContextId, Device, DeviceDriverEnum, DeviceId, DeviceOperationalStatusEnum, EndPoint) +from common.tools.context_queries.CheckType import ( + device_type_is_datacenter, device_type_is_network, endpoint_type_is_border) +from common.tools.context_queries.Device import add_device_to_topology, get_existing_device_uuids from common.tools.object_factory.Context import json_context_id from common.tools.object_factory.Device import json_device, json_device_id from context.client.ContextClient import ContextClient -from .Tools import ( - add_device_to_topology, device_type_is_datacenter, device_type_is_network, endpoint_type_is_border, - get_existing_device_uuids) LOGGER = logging.getLogger(__name__) diff --git a/src/interdomain/service/topology_abstractor/AbstractLink.py b/src/interdomain/service/topology_abstractor/AbstractLink.py index 2481f427a..7fe7b07b0 100644 --- a/src/interdomain/service/topology_abstractor/AbstractLink.py +++ b/src/interdomain/service/topology_abstractor/AbstractLink.py @@ -16,10 +16,10 @@ import copy, logging from typing import Dict, List, Optional, Tuple from common.Constants import DEFAULT_CONTEXT_UUID, INTERDOMAIN_TOPOLOGY_UUID from common.proto.context_pb2 import ContextId, EndPointId, Link, LinkId +from common.tools.context_queries.Link import add_link_to_topology, get_existing_link_uuids from common.tools.object_factory.Context import json_context_id from common.tools.object_factory.Link import json_link, json_link_id from context.client.ContextClient import ContextClient -from .Tools import add_link_to_topology, get_existing_link_uuids LOGGER = logging.getLogger(__name__) diff --git a/src/interdomain/service/topology_abstractor/Tools.py b/src/interdomain/service/topology_abstractor/Tools.py deleted file mode 100644 index 7d7885b99..000000000 --- a/src/interdomain/service/topology_abstractor/Tools.py +++ /dev/null @@ -1,147 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -from typing import List, Set, Union -from common.DeviceTypes import DeviceTypeEnum -from common.proto.context_pb2 import Context, ContextId, Device, Empty, Link, Topology, TopologyId -from common.tools.object_factory.Context import json_context, json_context_id -from common.tools.object_factory.Topology import json_topology, json_topology_id -from context.client.ContextClient import ContextClient - -LOGGER = logging.getLogger(__name__) - -def create_context( - context_client : ContextClient, context_uuid : str -) -> None: - existing_context_ids = context_client.ListContextIds(Empty()) - existing_context_uuids = {context_id.context_uuid.uuid for context_id in existing_context_ids.context_ids} - if context_uuid in existing_context_uuids: return - context_client.SetContext(Context(**json_context(context_uuid))) - -def create_topology( - context_client : ContextClient, context_uuid : str, topology_uuid : str -) -> None: - context_id = ContextId(**json_context_id(context_uuid)) - existing_topology_ids = context_client.ListTopologyIds(context_id) - existing_topology_uuids = {topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids} - if topology_uuid in existing_topology_uuids: return - context_client.SetTopology(Topology(**json_topology(topology_uuid, context_id=context_id))) - -def create_missing_topologies( - context_client : ContextClient, context_id : ContextId, topology_uuids : List[str] -) -> None: - # Find existing topologies within own context - existing_topology_ids = context_client.ListTopologyIds(context_id) - existing_topology_uuids = {topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids} - - # Create topologies within provided context - for topology_uuid in topology_uuids: - if topology_uuid in existing_topology_uuids: continue - grpc_topology = Topology(**json_topology(topology_uuid, context_id=context_id)) - context_client.SetTopology(grpc_topology) - -def get_existing_device_uuids(context_client : ContextClient) -> Set[str]: - existing_device_ids = context_client.ListDeviceIds(Empty()) - existing_device_uuids = {device_id.device_uuid.uuid for device_id in existing_device_ids.device_ids} - return existing_device_uuids - -def get_existing_link_uuids(context_client : ContextClient) -> Set[str]: - existing_link_ids = context_client.ListLinkIds(Empty()) - existing_link_uuids = {link_id.link_uuid.uuid for link_id in existing_link_ids.link_ids} - return existing_link_uuids - -def add_device_to_topology( - context_client : ContextClient, context_id : ContextId, topology_uuid : str, device_uuid : str -) -> bool: - topology_id = TopologyId(**json_topology_id(topology_uuid, context_id=context_id)) - topology_ro = context_client.GetTopology(topology_id) - device_uuids = {device_id.device_uuid.uuid for device_id in topology_ro.device_ids} - if device_uuid in device_uuids: return False # already existed - - topology_rw = Topology() - topology_rw.CopyFrom(topology_ro) - topology_rw.device_ids.add().device_uuid.uuid = device_uuid - context_client.SetTopology(topology_rw) - return True - -def add_link_to_topology( - context_client : ContextClient, context_id : ContextId, topology_uuid : str, link_uuid : str -) -> bool: - topology_id = TopologyId(**json_topology_id(topology_uuid, context_id=context_id)) - topology_ro = context_client.GetTopology(topology_id) - link_uuids = {link_id.link_uuid.uuid for link_id in topology_ro.link_ids} - if link_uuid in link_uuids: return False # already existed - - topology_rw = Topology() - topology_rw.CopyFrom(topology_ro) - topology_rw.link_ids.add().link_uuid.uuid = link_uuid - context_client.SetTopology(topology_rw) - return True - -def get_uuids_of_devices_in_topology( - context_client : ContextClient, context_id : ContextId, topology_uuid : str -) -> List[str]: - topology_id = TopologyId(**json_topology_id(topology_uuid, context_id=context_id)) - topology = context_client.GetTopology(topology_id) - device_uuids = [device_id.device_uuid.uuid for device_id in topology.device_ids] - return device_uuids - -def get_uuids_of_links_in_topology( - context_client : ContextClient, context_id : ContextId, topology_uuid : str -) -> List[str]: - topology_id = TopologyId(**json_topology_id(topology_uuid, context_id=context_id)) - topology = context_client.GetTopology(topology_id) - link_uuids = [link_id.link_uuid.uuid for link_id in topology.link_ids] - return link_uuids - -def get_devices_in_topology( - context_client : ContextClient, context_id : ContextId, topology_uuid : str -) -> List[Device]: - device_uuids = get_uuids_of_devices_in_topology(context_client, context_id, topology_uuid) - - all_devices = context_client.ListDevices(Empty()) - devices_in_topology = list() - for device in all_devices.devices: - device_uuid = device.device_id.device_uuid.uuid - if device_uuid not in device_uuids: continue - devices_in_topology.append(device) - - return devices_in_topology - -def get_links_in_topology( - context_client : ContextClient, context_id : ContextId, topology_uuid : str -) -> List[Link]: - link_uuids = get_uuids_of_links_in_topology(context_client, context_id, topology_uuid) - - all_links = context_client.ListLinks(Empty()) - links_in_topology = list() - for link in all_links.links: - link_uuid = link.link_id.link_uuid.uuid - if link_uuid not in link_uuids: continue - links_in_topology.append(link) - - return links_in_topology - -def device_type_is_datacenter(device_type : Union[str, DeviceTypeEnum]) -> bool: - return device_type in { - DeviceTypeEnum.DATACENTER, DeviceTypeEnum.DATACENTER.value, - DeviceTypeEnum.EMULATED_DATACENTER, DeviceTypeEnum.EMULATED_DATACENTER.value - } - -def device_type_is_network(device_type : Union[str, DeviceTypeEnum]) -> bool: - return device_type in {DeviceTypeEnum.NETWORK, DeviceTypeEnum.NETWORK.value} - -def endpoint_type_is_border(endpoint_type : str) -> bool: - return str(endpoint_type).endswith('/border') diff --git a/src/interdomain/service/topology_abstractor/TopologyAbstractor.py b/src/interdomain/service/topology_abstractor/TopologyAbstractor.py index 94abbfa52..5729fe733 100644 --- a/src/interdomain/service/topology_abstractor/TopologyAbstractor.py +++ b/src/interdomain/service/topology_abstractor/TopologyAbstractor.py @@ -19,6 +19,12 @@ from common.DeviceTypes import DeviceTypeEnum from common.proto.context_pb2 import ( ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPoint, EndPointId, Link, LinkEvent, TopologyId, TopologyEvent) +from common.tools.context_queries.CheckType import ( + device_type_is_datacenter, device_type_is_network, endpoint_type_is_border) +from common.tools.context_queries.Context import create_context +from common.tools.context_queries.Device import get_devices_in_topology, get_uuids_of_devices_in_topology +from common.tools.context_queries.Link import get_links_in_topology +from common.tools.context_queries.Topology import create_missing_topologies from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.Context import json_context_id from common.tools.object_factory.Device import json_device_id @@ -29,9 +35,6 @@ from dlt.connector.client.DltConnectorClient import DltConnectorClient from .AbstractDevice import AbstractDevice from .AbstractLink import AbstractLink from .DltRecordSender import DltRecordSender -from .Tools import ( - create_context, create_missing_topologies, device_type_is_datacenter, device_type_is_network, - endpoint_type_is_border, get_devices_in_topology, get_links_in_topology, get_uuids_of_devices_in_topology) from .Types import EventTypes LOGGER = logging.getLogger(__name__) -- GitLab