# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # SDN controller descriptor loader # Usage example (WebUI): # descriptors = json.loads( # descriptors=descriptors_data_from_client, num_workers=10, # context_client=..., device_client=..., service_client=..., slice_client=...) # descriptor_loader = DescriptorLoader(descriptors) # results = descriptor_loader.process() # for message,level in compose_notifications(results): # flash(message, level) # Usage example (pytest): # descriptor_loader = DescriptorLoader( # descriptors_file='path/to/descriptor.json', num_workers=10, # context_client=..., device_client=..., service_client=..., slice_client=...) # results = descriptor_loader.process() # check_results(results, descriptor_loader) # descriptor_loader.validate() # # do test ... # descriptor_loader.unload() import concurrent.futures, json, logging, operator from typing import Any, Dict, List, Optional, Tuple, Union from common.proto.context_pb2 import ( Connection, Context, ContextId, Device, DeviceId, Empty, Link, LinkId, Service, ServiceId, Slice, SliceId, Topology, TopologyId) from common.tools.object_factory.Context import json_context_id from context.client.ContextClient import ContextClient from device.client.DeviceClient import DeviceClient from service.client.ServiceClient import ServiceClient from slice.client.SliceClient import SliceClient from .Tools import ( format_device_custom_config_rules, format_service_custom_config_rules, format_slice_custom_config_rules, get_descriptors_add_contexts, get_descriptors_add_services, get_descriptors_add_slices, get_descriptors_add_topologies, split_devices_by_rules) LOGGER = logging.getLogger(__name__) LOGGERS = { 'success': LOGGER.info, 'danger' : LOGGER.error, 'error' : LOGGER.error, } ENTITY_TO_TEXT = { # name => singular, plural 'context' : ('Context', 'Contexts' ), 'topology' : ('Topology', 'Topologies' ), 'device' : ('Device', 'Devices' ), 'link' : ('Link', 'Links' ), 'service' : ('Service', 'Services' ), 'slice' : ('Slice', 'Slices' ), 'connection': ('Connection', 'Connections'), } ACTION_TO_TEXT = { # action => infinitive, past 'add' : ('Add', 'Added'), 'update' : ('Update', 'Updated'), 'config' : ('Configure', 'Configured'), } TypeResults = List[Tuple[str, str, int, List[str]]] # entity_name, action, num_ok, list[error] TypeNotification = Tuple[str, str] # message, level TypeNotificationList = List[TypeNotification] class DescriptorLoader: def __init__( self, descriptors : Optional[Union[str, Dict]] = None, descriptors_file : Optional[str] = None, num_workers : int = 1, context_client : Optional[ContextClient] = None, device_client : Optional[DeviceClient] = None, service_client : Optional[ServiceClient] = None, slice_client : Optional[SliceClient] = None ) -> None: if (descriptors is None) == (descriptors_file is None): # pylint: disable=broad-exception-raised raise Exception('Exactly one of "descriptors" or "descriptors_file" is required') if descriptors_file is not None: with open(descriptors_file, 'r', encoding='UTF-8') as f: self.__descriptors = json.loads(f.read()) self.__descriptor_file_path = descriptors_file else: # descriptors is not None self.__descriptors = json.loads(descriptors) if isinstance(descriptors, str) else descriptors self.__descriptor_file_path = '' self.__num_workers = num_workers self.__dummy_mode = self.__descriptors.get('dummy_mode' , False) self.__contexts = self.__descriptors.get('contexts' , []) self.__topologies = self.__descriptors.get('topologies' , []) self.__devices = self.__descriptors.get('devices' , []) self.__links = self.__descriptors.get('links' , []) self.__services = self.__descriptors.get('services' , []) self.__slices = self.__descriptors.get('slices' , []) self.__connections = self.__descriptors.get('connections', []) self.__contexts_add = None self.__topologies_add = None self.__devices_add = None self.__devices_config = None self.__services_add = None self.__slices_add = None self.__ctx_cli = ContextClient() if context_client is None else context_client self.__dev_cli = DeviceClient() if device_client is None else device_client self.__svc_cli = ServiceClient() if service_client is None else service_client self.__slc_cli = SliceClient() if slice_client is None else slice_client self.__results : TypeResults = list() @property def descriptor_file_path(self) -> Optional[str]: return self.__descriptor_file_path @property def num_workers(self) -> int: return self.__num_workers @property def context_client(self) -> Optional[ContextClient]: return self.__ctx_cli @property def device_client(self) -> Optional[DeviceClient]: return self.__dev_cli @property def service_client(self) -> Optional[ServiceClient]: return self.__svc_cli @property def slice_client(self) -> Optional[SliceClient]: return self.__slc_cli @property def contexts(self) -> List[Dict]: return self.__contexts @property def num_contexts(self) -> int: return len(self.__contexts) @property def topologies(self) -> Dict[str, List[Dict]]: _topologies = {} for topology in self.__topologies: context_uuid = topology['topology_id']['context_id']['context_uuid']['uuid'] _topologies.setdefault(context_uuid, []).append(topology) return _topologies @property def num_topologies(self) -> Dict[str, int]: _num_topologies = {} for topology in self.__topologies: context_uuid = topology['topology_id']['context_id']['context_uuid']['uuid'] _num_topologies[context_uuid] = _num_topologies.get(context_uuid, 0) + 1 return _num_topologies @property def devices(self) -> List[Dict]: return self.__devices @property def num_devices(self) -> int: return len(self.__devices) @property def links(self) -> List[Dict]: return self.__links @property def num_links(self) -> int: return len(self.__links) @property def services(self) -> Dict[str, List[Dict]]: _services = {} for service in self.__services: context_uuid = service['service_id']['context_id']['context_uuid']['uuid'] _services.setdefault(context_uuid, []).append(service) return _services @property def num_services(self) -> Dict[str, int]: _num_services = {} for service in self.__services: context_uuid = service['service_id']['context_id']['context_uuid']['uuid'] _num_services[context_uuid] = _num_services.get(context_uuid, 0) + 1 return _num_services @property def slices(self) -> Dict[str, List[Dict]]: _slices = {} for slice_ in self.__slices: context_uuid = slice_['slice_id']['context_id']['context_uuid']['uuid'] _slices.setdefault(context_uuid, []).append(slice_) return _slices @property def num_slices(self) -> Dict[str, int]: _num_slices = {} for slice_ in self.__slices: context_uuid = slice_['slice_id']['context_id']['context_uuid']['uuid'] _num_slices[context_uuid] = _num_slices.get(context_uuid, 0) + 1 return _num_slices @property def connections(self) -> List[Dict]: return self.__connections @property def num_connections(self) -> int: return len(self.__connections) def process(self) -> TypeResults: # Format CustomConfigRules in Devices, Services and Slices provided in JSON format self.__devices = [format_device_custom_config_rules (device ) for device in self.__devices ] self.__services = [format_service_custom_config_rules(service) for service in self.__services] self.__slices = [format_slice_custom_config_rules (slice_ ) for slice_ in self.__slices ] # Context and Topology require to create the entity first, and add devices, links, services, # slices, etc. in a second stage. self.__contexts_add = get_descriptors_add_contexts(self.__contexts) self.__topologies_add = get_descriptors_add_topologies(self.__topologies) if self.__dummy_mode: self._load_dummy_mode() else: self._load_normal_mode() return self.__results def _load_dummy_mode(self) -> None: # Dummy Mode: used to pre-load databases (WebUI debugging purposes) with no smart or automated tasks. self.__ctx_cli.connect() self._process_descr('context', 'add', self.__ctx_cli.SetContext, Context, self.__contexts_add ) self._process_descr('topology', 'add', self.__ctx_cli.SetTopology, Topology, self.__topologies_add) self._process_descr('device', 'add', self.__ctx_cli.SetDevice, Device, self.__devices ) self._process_descr('link', 'add', self.__ctx_cli.SetLink, Link, self.__links ) self._process_descr('service', 'add', self.__ctx_cli.SetService, Service, self.__services ) self._process_descr('slice', 'add', self.__ctx_cli.SetSlice, Slice, self.__slices ) self._process_descr('connection', 'add', self.__ctx_cli.SetConnection, Connection, self.__connections ) # Update context and topology is useless: # - devices and links are assigned to topologies automatically by Context component # - topologies, services, and slices are assigned to contexts automatically by Context component #self._process_descr('context', 'update', self.__ctx_cli.SetContext, Context, self.__contexts ) #self._process_descr('topology', 'update', self.__ctx_cli.SetTopology, Topology, self.__topologies ) #self.__ctx_cli.close() def _load_normal_mode(self) -> None: # Normal mode: follows the automated workflows in the different components assert len(self.__connections) == 0, 'in normal mode, connections should not be set' # Device, Service and Slice require to first create the entity and the configure it self.__devices_add, self.__devices_config = split_devices_by_rules(self.__devices) self.__services_add = get_descriptors_add_services(self.__services) self.__slices_add = get_descriptors_add_slices(self.__slices) self.__ctx_cli.connect() self.__dev_cli.connect() self.__svc_cli.connect() self.__slc_cli.connect() self._process_descr('context', 'add', self.__ctx_cli.SetContext, Context, self.__contexts_add ) self._process_descr('topology', 'add', self.__ctx_cli.SetTopology, Topology, self.__topologies_add) self._process_descr('device', 'add', self.__dev_cli.AddDevice, Device, self.__devices_add ) self._process_descr('device', 'config', self.__dev_cli.ConfigureDevice, Device, self.__devices_config) self._process_descr('link', 'add', self.__ctx_cli.SetLink, Link, self.__links ) self._process_descr('service', 'add', self.__svc_cli.CreateService, Service, self.__services_add ) self._process_descr('service', 'update', self.__svc_cli.UpdateService, Service, self.__services ) self._process_descr('slice', 'add', self.__slc_cli.CreateSlice, Slice, self.__slices_add ) self._process_descr('slice', 'update', self.__slc_cli.UpdateSlice, Slice, self.__slices ) # Update context and topology is useless: # - devices and links are assigned to topologies automatically by Context component # - topologies, services, and slices are assigned to contexts automatically by Context component #self._process_descr('context', 'update', self.__ctx_cli.SetContext, Context, self.__contexts ) #self._process_descr('topology', 'update', self.__ctx_cli.SetTopology, Topology, self.__topologies ) #self.__slc_cli.close() #self.__svc_cli.close() #self.__dev_cli.close() #self.__ctx_cli.close() @staticmethod def worker(grpc_method, grpc_class, entity) -> Any: return grpc_method(grpc_class(**entity)) def _process_descr(self, entity_name, action_name, grpc_method, grpc_class, entities) -> None: num_ok, error_list = 0, [] with concurrent.futures.ThreadPoolExecutor(max_workers=self.__num_workers) as executor: future_to_entity = { executor.submit(DescriptorLoader.worker, grpc_method, grpc_class, entity): (i, entity) for i,entity in enumerate(entities) } for future in concurrent.futures.as_completed(future_to_entity): i, entity = future_to_entity[future] try: _ = future.result() num_ok += 1 except Exception as e: # pylint: disable=broad-except error_list.append((i, f'{str(entity)}: {str(e)}')) error_list = [str_error for _,str_error in sorted(error_list, key=operator.itemgetter(0))] self.__results.append((entity_name, action_name, num_ok, error_list)) def validate(self) -> None: self.__ctx_cli.connect() contexts = self.__ctx_cli.ListContexts(Empty()) assert len(contexts.contexts) == self.num_contexts for context_uuid, num_topologies in self.num_topologies.items(): response = self.__ctx_cli.ListTopologies(ContextId(**json_context_id(context_uuid))) assert len(response.topologies) == num_topologies response = self.__ctx_cli.ListDevices(Empty()) assert len(response.devices) == self.num_devices response = self.__ctx_cli.ListLinks(Empty()) assert len(response.links) == self.num_links for context_uuid, num_services in self.num_services.items(): response = self.__ctx_cli.ListServices(ContextId(**json_context_id(context_uuid))) assert len(response.services) == num_services for context_uuid, num_slices in self.num_slices.items(): response = self.__ctx_cli.ListSlices(ContextId(**json_context_id(context_uuid))) assert len(response.slices) == num_slices def _unload_dummy_mode(self) -> None: # Dummy Mode: used to pre-load databases (WebUI debugging purposes) with no smart or automated tasks. self.__ctx_cli.connect() for _, slice_list in self.slices.items(): for slice_ in slice_list: self.__ctx_cli.RemoveSlice(SliceId(**slice_['slice_id'])) for _, service_list in self.services.items(): for service in service_list: self.__ctx_cli.RemoveService(ServiceId(**service['service_id'])) for link in self.links: self.__ctx_cli.RemoveLink(LinkId(**link['link_id'])) for device in self.devices: self.__ctx_cli.RemoveDevice(DeviceId(**device['device_id'])) for _, topology_list in self.topologies.items(): for topology in topology_list: self.__ctx_cli.RemoveTopology(TopologyId(**topology['topology_id'])) for context in self.contexts: self.__ctx_cli.RemoveContext(ContextId(**context['context_id'])) #self.__ctx_cli.close() def _unload_normal_mode(self) -> None: # Normal mode: follows the automated workflows in the different components self.__ctx_cli.connect() self.__dev_cli.connect() self.__svc_cli.connect() self.__slc_cli.connect() for _, slice_list in self.slices.items(): for slice_ in slice_list: self.__slc_cli.DeleteSlice(SliceId(**slice_['slice_id'])) for _, service_list in self.services.items(): for service in service_list: self.__svc_cli.DeleteService(ServiceId(**service['service_id'])) for link in self.links: self.__ctx_cli.RemoveLink(LinkId(**link['link_id'])) for device in self.devices: self.__dev_cli.DeleteDevice(DeviceId(**device['device_id'])) for _, topology_list in self.topologies.items(): for topology in topology_list: self.__ctx_cli.RemoveTopology(TopologyId(**topology['topology_id'])) for context in self.contexts: self.__ctx_cli.RemoveContext(ContextId(**context['context_id'])) #self.__ctx_cli.close() #self.__dev_cli.close() #self.__svc_cli.close() #self.__slc_cli.close() def unload(self) -> None: if self.__dummy_mode: self._unload_dummy_mode() else: self._unload_normal_mode() def compose_notifications(results : TypeResults) -> TypeNotificationList: notifications = [] for entity_name, action_name, num_ok, error_list in results: entity_name_singluar,entity_name_plural = ENTITY_TO_TEXT[entity_name] action_infinitive, action_past = ACTION_TO_TEXT[action_name] num_err = len(error_list) for error in error_list: notifications.append((f'Unable to {action_infinitive} {entity_name_singluar} {error}', 'error')) if num_ok : notifications.append((f'{str(num_ok)} {entity_name_plural} {action_past}', 'success')) if num_err: notifications.append((f'{str(num_err)} {entity_name_plural} failed', 'danger')) return notifications def check_descriptor_load_results(results : TypeResults, descriptor_loader : DescriptorLoader) -> None: num_errors = 0 for message,level in compose_notifications(results): LOGGERS.get(level)(message) if level != 'success': num_errors += 1 if num_errors > 0: MSG = 'Failed to load descriptors from "{:s}"' raise Exception(MSG.format(str(descriptor_loader.descriptor_file_path))) def validate_empty_scenario(context_client : ContextClient) -> None: response = context_client.ListContexts(Empty()) assert len(response.contexts) == 0 response = context_client.ListDevices(Empty()) assert len(response.devices) == 0 response = context_client.ListLinks(Empty()) assert len(response.links) == 0