diff --git a/src/common/tests/LoadScenario.py b/src/common/tests/LoadScenario.py deleted file mode 100644 index 93cf3708cfc5f8a4296a5cb68772984beefd7563..0000000000000000000000000000000000000000 --- a/src/common/tests/LoadScenario.py +++ /dev/null @@ -1,50 +0,0 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -from common.tools.descriptor.Loader import DescriptorLoader, compose_notifications -from context.client.ContextClient import ContextClient -from device.client.DeviceClient import DeviceClient -from service.client.ServiceClient import ServiceClient -from slice.client.SliceClient import SliceClient - -LOGGER = logging.getLogger(__name__) -LOGGERS = { - 'success': LOGGER.info, - 'danger' : LOGGER.error, - 'error' : LOGGER.error, -} - -def load_scenario_from_descriptor( - descriptor_file : str, context_client : ContextClient, device_client : DeviceClient, - service_client : ServiceClient, slice_client : SliceClient -) -> DescriptorLoader: - with open(descriptor_file, 'r', encoding='UTF-8') as f: - descriptors = f.read() - - descriptor_loader = DescriptorLoader( - descriptors, - context_client=context_client, device_client=device_client, - service_client=service_client, slice_client=slice_client) - results = descriptor_loader.process() - - num_errors = 0 - for message,level in compose_notifications(results): - LOGGERS.get(level)(message) - if level != 'success': num_errors += 1 - if num_errors > 0: - MSG = 'Failed to load descriptors in file {:s}' - raise Exception(MSG.format(str(descriptor_file))) - - return descriptor_loader \ No newline at end of file diff --git a/src/common/tools/descriptor/Loader.py b/src/common/tools/descriptor/Loader.py index 5972d425be5298ec7fcb63bd28b50f3643363ae4..0e1d8c7371e87b47bfc47a4242e00039add48e7f 100644 --- a/src/common/tools/descriptor/Loader.py +++ b/src/common/tools/descriptor/Loader.py @@ -15,25 +15,30 @@ # SDN controller descriptor loader # Usage example (WebUI): -# descriptors = json.loads(descriptors_data_from_client) +# descriptors = json.loads( +# descriptors=descriptors_data_from_client, num_workers=10, +# context_client=..., device_client=..., service_client=..., slice_client=...) # descriptor_loader = DescriptorLoader(descriptors) # results = descriptor_loader.process() # for message,level in compose_notifications(results): # flash(message, level) # Usage example (pytest): -# with open('path/to/descriptor.json', 'r', encoding='UTF-8') as f: -# descriptors = json.loads(f.read()) # descriptor_loader = DescriptorLoader( -# descriptors, context_client=..., device_client=..., service_client=..., slice_client=...) +# descriptors_file='path/to/descriptor.json', num_workers=10, +# context_client=..., device_client=..., service_client=..., slice_client=...) # results = descriptor_loader.process() -# loggers = {'success': LOGGER.info, 'danger': LOGGER.error, 'error': LOGGER.error} -# for message,level in compose_notifications(results): -# loggers.get(level)(message) +# check_results(results, descriptor_loader) +# descriptor_loader.validate() +# # do test ... +# descriptor_loader.unload() import concurrent.futures, json, logging, operator from typing import Any, Dict, List, Optional, Tuple, Union -from common.proto.context_pb2 import Connection, Context, Device, Link, Service, Slice, Topology +from common.proto.context_pb2 import ( + Connection, Context, ContextId, Device, DeviceId, Empty, Link, LinkId, Service, ServiceId, Slice, SliceId, + Topology, TopologyId) +from common.tools.object_factory.Context import json_context_id from context.client.ContextClient import ContextClient from device.client.DeviceClient import DeviceClient from service.client.ServiceClient import ServiceClient @@ -44,6 +49,11 @@ from .Tools import ( get_descriptors_add_topologies, split_devices_by_rules) LOGGER = logging.getLogger(__name__) +LOGGERS = { + 'success': LOGGER.info, + 'danger' : LOGGER.error, + 'error' : LOGGER.error, +} ENTITY_TO_TEXT = { # name => singular, plural @@ -67,25 +77,26 @@ TypeResults = List[Tuple[str, str, int, List[str]]] # entity_name, action, num_o TypeNotification = Tuple[str, str] # message, level TypeNotificationList = List[TypeNotification] -def compose_notifications(results : TypeResults) -> TypeNotificationList: - notifications = [] - for entity_name, action_name, num_ok, error_list in results: - entity_name_singluar,entity_name_plural = ENTITY_TO_TEXT[entity_name] - action_infinitive, action_past = ACTION_TO_TEXT[action_name] - num_err = len(error_list) - for error in error_list: - notifications.append((f'Unable to {action_infinitive} {entity_name_singluar} {error}', 'error')) - if num_ok : notifications.append((f'{str(num_ok)} {entity_name_plural} {action_past}', 'success')) - if num_err: notifications.append((f'{str(num_err)} {entity_name_plural} failed', 'danger')) - return notifications - class DescriptorLoader: def __init__( - self, descriptors : Union[str, Dict], num_workers : int = 1, + self, descriptors : Optional[Union[str, Dict]] = None, descriptors_file : Optional[str] = None, + num_workers : int = 1, context_client : Optional[ContextClient] = None, device_client : Optional[DeviceClient] = None, service_client : Optional[ServiceClient] = None, slice_client : Optional[SliceClient] = None ) -> None: - self.__descriptors = json.loads(descriptors) if isinstance(descriptors, str) else descriptors + if (descriptors is None) == (descriptors_file is None): + raise Exception('Exactly one of "descriptors" or "descriptors_file" is required') + + if descriptors_file is not None: + with open(descriptors_file, 'r', encoding='UTF-8') as f: + self.__descriptors = json.loads(f.read()) + self.__descriptor_file_path = descriptors_file + else: # descriptors is not None + self.__descriptors = json.loads(descriptors) if isinstance(descriptors, str) else descriptors + self.__descriptor_file_path = '<dict>' + + self.__num_workers = num_workers + self.__dummy_mode = self.__descriptors.get('dummy_mode' , False) self.__contexts = self.__descriptors.get('contexts' , []) self.__topologies = self.__descriptors.get('topologies' , []) @@ -95,8 +106,6 @@ class DescriptorLoader: self.__slices = self.__descriptors.get('slices' , []) self.__connections = self.__descriptors.get('connections', []) - self.__num_workers = num_workers - self.__contexts_add = None self.__topologies_add = None self.__devices_add = None @@ -111,6 +120,24 @@ class DescriptorLoader: self.__results : TypeResults = list() + @property + def descriptor_file_path(self) -> Optional[str]: return self.__descriptor_file_path + + @property + def num_workers(self) -> int: return self.__num_workers + + @property + def context_client(self) -> Optional[ContextClient]: return self.__ctx_cli + + @property + def device_client(self) -> Optional[DeviceClient]: return self.__dev_cli + + @property + def service_client(self) -> Optional[ServiceClient]: return self.__svc_cli + + @property + def slice_client(self) -> Optional[SliceClient]: return self.__slc_cli + @property def contexts(self) -> List[Dict]: return self.__contexts @@ -269,3 +296,85 @@ class DescriptorLoader: error_list = [str_error for _,str_error in sorted(error_list, key=operator.itemgetter(0))] self.__results.append((entity_name, action_name, num_ok, error_list)) + + def validate(self) -> None: + self.__ctx_cli.connect() + + contexts = self.__ctx_cli.ListContexts(Empty()) + assert len(contexts.contexts) == self.num_contexts + + for context_uuid, num_topologies in self.num_topologies.items(): + response = self.__ctx_cli.ListTopologies(ContextId(**json_context_id(context_uuid))) + assert len(response.topologies) == num_topologies + + response = self.__ctx_cli.ListDevices(Empty()) + assert len(response.devices) == self.num_devices + + response = self.__ctx_cli.ListLinks(Empty()) + assert len(response.links) == self.num_links + + for context_uuid, num_services in self.num_services.items(): + response = self.__ctx_cli.ListServices(ContextId(**json_context_id(context_uuid))) + assert len(response.services) == num_services + + for context_uuid, num_slices in self.num_slices.items(): + response = self.__ctx_cli.ListSlices(ContextId(**json_context_id(context_uuid))) + assert len(response.slices) == num_slices + + def unload(self) -> None: + self.__ctx_cli.connect() + self.__dev_cli.connect() + self.__svc_cli.connect() + self.__slc_cli.connect() + + for _, slice_list in self.slices.items(): + for slice_ in slice_list: + self.__slc_cli.DeleteSlice(SliceId(**slice_['slice_id'])) + + for _, service_list in self.services.items(): + for service in service_list: + self.__svc_cli.DeleteService(ServiceId(**service['service_id'])) + + for link in self.links: + self.__ctx_cli.RemoveLink(LinkId(**link['link_id'])) + + for device in self.devices: + self.__dev_cli.DeleteDevice(DeviceId(**device['device_id'])) + + for _, topology_list in self.topologies.items(): + for topology in topology_list: + self.__ctx_cli.RemoveTopology(TopologyId(**topology['topology_id'])) + + for context in self.contexts: + self.__ctx_cli.RemoveContext(ContextId(**context['context_id'])) + +def compose_notifications(results : TypeResults) -> TypeNotificationList: + notifications = [] + for entity_name, action_name, num_ok, error_list in results: + entity_name_singluar,entity_name_plural = ENTITY_TO_TEXT[entity_name] + action_infinitive, action_past = ACTION_TO_TEXT[action_name] + num_err = len(error_list) + for error in error_list: + notifications.append((f'Unable to {action_infinitive} {entity_name_singluar} {error}', 'error')) + if num_ok : notifications.append((f'{str(num_ok)} {entity_name_plural} {action_past}', 'success')) + if num_err: notifications.append((f'{str(num_err)} {entity_name_plural} failed', 'danger')) + return notifications + +def check_descriptor_load_results(results : TypeResults, descriptor_loader : DescriptorLoader) -> None: + num_errors = 0 + for message,level in compose_notifications(results): + LOGGERS.get(level)(message) + if level != 'success': num_errors += 1 + if num_errors > 0: + MSG = 'Failed to load descriptors from "{:s}"' + raise Exception(MSG.format(str(descriptor_loader.descriptor_file_path))) + +def validate_empty_scenario(context_client : ContextClient) -> None: + response = context_client.ListContexts(Empty()) + assert len(response.contexts) == 0 + + response = context_client.ListDevices(Empty()) + assert len(response.devices) == 0 + + response = context_client.ListLinks(Empty()) + assert len(response.links) == 0