Skip to content
Snippets Groups Projects
Commit c44efbcf authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Common - Tool - DescriptorLoader:

- Implemented parallel descriptor loading, especially for device onboarding
parent 1a724cc7
No related branches found
No related tags found
2 merge requests!142Release TeraFlowSDN 2.1,!59Common - Tool - DescriptorLoader:
......@@ -31,8 +31,8 @@
# for message,level in compose_notifications(results):
# loggers.get(level)(message)
import json
from typing import Dict, List, Optional, Tuple, Union
import concurrent.futures, json, logging, operator
from typing import Any, Dict, List, Optional, Tuple, Union
from common.proto.context_pb2 import Connection, Context, Device, Link, Service, Slice, Topology
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
......@@ -43,6 +43,8 @@ from .Tools import (
get_descriptors_add_contexts, get_descriptors_add_services, get_descriptors_add_slices,
get_descriptors_add_topologies, split_devices_by_rules)
LOGGER = logging.getLogger(__name__)
ENTITY_TO_TEXT = {
# name => singular, plural
'context' : ('Context', 'Contexts' ),
......@@ -79,7 +81,7 @@ def compose_notifications(results : TypeResults) -> TypeNotificationList:
class DescriptorLoader:
def __init__(
self, descriptors : Union[str, Dict],
self, descriptors : Union[str, Dict], num_workers : int = 1,
context_client : Optional[ContextClient] = None, device_client : Optional[DeviceClient] = None,
service_client : Optional[ServiceClient] = None, slice_client : Optional[SliceClient] = None
) -> None:
......@@ -93,6 +95,8 @@ class DescriptorLoader:
self.__slices = self.__descriptors.get('slices' , [])
self.__connections = self.__descriptors.get('connections', [])
self.__num_workers = num_workers
self.__contexts_add = None
self.__topologies_add = None
self.__devices_add = None
......@@ -242,12 +246,26 @@ class DescriptorLoader:
#self.__dev_cli.close()
#self.__ctx_cli.close()
@staticmethod
def worker(grpc_method, grpc_class, entity) -> Any:
return grpc_method(grpc_class(**entity))
def _process_descr(self, entity_name, action_name, grpc_method, grpc_class, entities) -> None:
num_ok, error_list = 0, []
for entity in entities:
try:
grpc_method(grpc_class(**entity))
num_ok += 1
except Exception as e: # pylint: disable=broad-except
error_list.append(f'{str(entity)}: {str(e)}')
with concurrent.futures.ThreadPoolExecutor(max_workers=self.__num_workers) as executor:
future_to_entity = {
executor.submit(DescriptorLoader.worker, grpc_method, grpc_class, entity): (i, entity)
for i,entity in enumerate(entities)
}
for future in concurrent.futures.as_completed(future_to_entity):
i, entity = future_to_entity[future]
try:
_ = future.result()
num_ok += 1
except Exception as e: # pylint: disable=broad-except
error_list.append((i, f'{str(entity)}: {str(e)}'))
error_list = [str_error for _,str_error in sorted(error_list, key=operator.itemgetter(0))]
self.__results.append((entity_name, action_name, num_ok, error_list))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment