# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import copy from typing import Optional from common.proto.pathcomp_pb2 import Algorithm_KDisjointPath, Algorithm_KShortestPath, PathCompReply from ._Algorithm import _Algorithm from .KShortestPathAlgorithm import KShortestPathAlgorithm class KDisjointPathAlgorithm(_Algorithm): def __init__(self, algorithm : Algorithm_KDisjointPath, class_name=__name__) -> None: super().__init__('KDP', False, class_name=class_name) self.num_disjoint = algorithm.num_disjoint def execute(self, dump_request_filename: Optional[str] = None, dump_reply_filename: Optional[str] = None) -> None: algorithm = KShortestPathAlgorithm(Algorithm_KShortestPath(k_inspection=0, k_return=1)) algorithm.sync_paths = True algorithm.device_list = self.device_list algorithm.device_dict = self.device_dict algorithm.endpoint_dict = self.endpoint_dict algorithm.link_list = self.link_list algorithm.link_dict = self.link_dict algorithm.endpoint_to_link_dict = self.endpoint_to_link_dict algorithm.service_list = self.service_list algorithm.service_dict = self.service_dict disjoint_paths = dict() for num_path in range(self.num_disjoint): algorithm.execute('ksp-{:d}-request.json'.format(num_path), 'ksp-{:d}-reply.txt'.format(num_path)) response_list = algorithm.json_reply.get('response-list', []) for response in response_list: service_id = response['serviceId'] service_key = (service_id['contextId'], service_id['service_uuid']) disjoint_paths_service = disjoint_paths.setdefault(service_key, list()) no_path_issue = response.get('noPath', {}).get('issue') if no_path_issue is not None: disjoint_paths_service.append(None) continue path_endpoints = response['path'][0]['devices'] path_links = list() path_link_ids = set() for endpoint in path_endpoints: device_uuid = endpoint['device_id'] endpoint_uuid = endpoint['endpoint_uuid'] item = algorithm.endpoint_to_link_dict.get((device_uuid, endpoint_uuid)) if item is None: MSG = 'Link for Endpoint({:s}, {:s}) not found' self.logger.warning(MSG.format(device_uuid, endpoint_uuid)) continue json_link,_ = item json_link_id = json_link['link_Id'] if len(path_links) == 0 or path_links[-1]['link_Id'] != json_link_id: path_links.append(json_link) path_link_ids.add(json_link_id) self.logger.info('path_links = {:s}'.format(str(path_links))) disjoint_paths_service.append(path_links) new_link_list = list(filter(lambda l: l['link_Id'] not in path_link_ids, algorithm.link_list)) self.logger.info('algorithm.link_list = {:s}'.format(str(algorithm.link_list))) self.logger.info('new_link_list = {:s}'.format(str(new_link_list))) algorithm.link_list = new_link_list # TODO: find used links and remove them from algorithm.link_list # TODO: compose disjoint path found self.logger.info('disjoint_paths = {:s}'.format(str(disjoint_paths))) self.json_reply = {}