diff --git a/src/common/tools/descriptor/Loader.py b/src/common/tools/descriptor/Loader.py index fc3b008b4004efe5afc270da65246c4635c777c3..5972d425be5298ec7fcb63bd28b50f3643363ae4 100644 --- a/src/common/tools/descriptor/Loader.py +++ b/src/common/tools/descriptor/Loader.py @@ -31,8 +31,8 @@ # for message,level in compose_notifications(results): # loggers.get(level)(message) -import json -from typing import Dict, List, Optional, Tuple, Union +import concurrent.futures, json, logging, operator +from typing import Any, Dict, List, Optional, Tuple, Union from common.proto.context_pb2 import Connection, Context, Device, Link, Service, Slice, Topology from context.client.ContextClient import ContextClient from device.client.DeviceClient import DeviceClient @@ -43,6 +43,8 @@ from .Tools import ( get_descriptors_add_contexts, get_descriptors_add_services, get_descriptors_add_slices, get_descriptors_add_topologies, split_devices_by_rules) +LOGGER = logging.getLogger(__name__) + ENTITY_TO_TEXT = { # name => singular, plural 'context' : ('Context', 'Contexts' ), @@ -79,7 +81,7 @@ def compose_notifications(results : TypeResults) -> TypeNotificationList: class DescriptorLoader: def __init__( - self, descriptors : Union[str, Dict], + self, descriptors : Union[str, Dict], num_workers : int = 1, context_client : Optional[ContextClient] = None, device_client : Optional[DeviceClient] = None, service_client : Optional[ServiceClient] = None, slice_client : Optional[SliceClient] = None ) -> None: @@ -93,6 +95,8 @@ class DescriptorLoader: self.__slices = self.__descriptors.get('slices' , []) self.__connections = self.__descriptors.get('connections', []) + self.__num_workers = num_workers + self.__contexts_add = None self.__topologies_add = None self.__devices_add = None @@ -242,12 +246,26 @@ class DescriptorLoader: #self.__dev_cli.close() #self.__ctx_cli.close() + @staticmethod + def worker(grpc_method, grpc_class, entity) -> Any: + return grpc_method(grpc_class(**entity)) + def _process_descr(self, entity_name, action_name, grpc_method, grpc_class, entities) -> None: num_ok, error_list = 0, [] - for entity in entities: - try: - grpc_method(grpc_class(**entity)) - num_ok += 1 - except Exception as e: # pylint: disable=broad-except - error_list.append(f'{str(entity)}: {str(e)}') + + with concurrent.futures.ThreadPoolExecutor(max_workers=self.__num_workers) as executor: + future_to_entity = { + executor.submit(DescriptorLoader.worker, grpc_method, grpc_class, entity): (i, entity) + for i,entity in enumerate(entities) + } + + for future in concurrent.futures.as_completed(future_to_entity): + i, entity = future_to_entity[future] + try: + _ = future.result() + num_ok += 1 + except Exception as e: # pylint: disable=broad-except + error_list.append((i, f'{str(entity)}: {str(e)}')) + + error_list = [str_error for _,str_error in sorted(error_list, key=operator.itemgetter(0))] self.__results.append((entity_name, action_name, num_ok, error_list)) diff --git a/src/webui/service/main/routes.py b/src/webui/service/main/routes.py index 38d13aad562f3e55490952db84ef784f87697739..dcbbf71a6fee6ebd040f14c7d0d2cb07ba9ee085 100644 --- a/src/webui/service/main/routes.py +++ b/src/webui/service/main/routes.py @@ -34,6 +34,8 @@ slice_client = SliceClient() LOGGER = logging.getLogger(__name__) +DESCRIPTOR_LOADER_NUM_WORKERS = 10 + def process_descriptors(descriptors): try: descriptors_file = request.files[descriptors.name] @@ -43,7 +45,7 @@ def process_descriptors(descriptors): flash(f'Unable to load descriptor file: {str(e)}', 'danger') return - descriptor_loader = DescriptorLoader(descriptors) + descriptor_loader = DescriptorLoader(descriptors, num_workers=DESCRIPTOR_LOADER_NUM_WORKERS) results = descriptor_loader.process() for message,level in compose_notifications(results): if level == 'error': LOGGER.warning('ERROR message={:s}'.format(str(message)))