Skip to content
Snippets Groups Projects

Common - Tool - DescriptorLoader:

Merged Lluis Gifre Renom requested to merge fix/load-devices-parallel into release/2.0.1
2 files
+ 30
10
Compare changes
  • Side-by-side
  • Inline
Files
2
@@ -31,8 +31,8 @@
@@ -31,8 +31,8 @@
# for message,level in compose_notifications(results):
# for message,level in compose_notifications(results):
# loggers.get(level)(message)
# loggers.get(level)(message)
import json
import concurrent.futures, json, logging, operator
from typing import Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Union
from common.proto.context_pb2 import Connection, Context, Device, Link, Service, Slice, Topology
from common.proto.context_pb2 import Connection, Context, Device, Link, Service, Slice, Topology
from context.client.ContextClient import ContextClient
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from device.client.DeviceClient import DeviceClient
@@ -43,6 +43,8 @@ from .Tools import (
@@ -43,6 +43,8 @@ from .Tools import (
get_descriptors_add_contexts, get_descriptors_add_services, get_descriptors_add_slices,
get_descriptors_add_contexts, get_descriptors_add_services, get_descriptors_add_slices,
get_descriptors_add_topologies, split_devices_by_rules)
get_descriptors_add_topologies, split_devices_by_rules)
 
LOGGER = logging.getLogger(__name__)
 
ENTITY_TO_TEXT = {
ENTITY_TO_TEXT = {
# name => singular, plural
# name => singular, plural
'context' : ('Context', 'Contexts' ),
'context' : ('Context', 'Contexts' ),
@@ -79,7 +81,7 @@ def compose_notifications(results : TypeResults) -> TypeNotificationList:
@@ -79,7 +81,7 @@ def compose_notifications(results : TypeResults) -> TypeNotificationList:
class DescriptorLoader:
class DescriptorLoader:
def __init__(
def __init__(
self, descriptors : Union[str, Dict],
self, descriptors : Union[str, Dict], num_workers : int = 1,
context_client : Optional[ContextClient] = None, device_client : Optional[DeviceClient] = None,
context_client : Optional[ContextClient] = None, device_client : Optional[DeviceClient] = None,
service_client : Optional[ServiceClient] = None, slice_client : Optional[SliceClient] = None
service_client : Optional[ServiceClient] = None, slice_client : Optional[SliceClient] = None
) -> None:
) -> None:
@@ -93,6 +95,8 @@ class DescriptorLoader:
@@ -93,6 +95,8 @@ class DescriptorLoader:
self.__slices = self.__descriptors.get('slices' , [])
self.__slices = self.__descriptors.get('slices' , [])
self.__connections = self.__descriptors.get('connections', [])
self.__connections = self.__descriptors.get('connections', [])
 
self.__num_workers = num_workers
 
self.__contexts_add = None
self.__contexts_add = None
self.__topologies_add = None
self.__topologies_add = None
self.__devices_add = None
self.__devices_add = None
@@ -242,12 +246,26 @@ class DescriptorLoader:
@@ -242,12 +246,26 @@ class DescriptorLoader:
#self.__dev_cli.close()
#self.__dev_cli.close()
#self.__ctx_cli.close()
#self.__ctx_cli.close()
 
@staticmethod
 
def worker(grpc_method, grpc_class, entity) -> Any:
 
return grpc_method(grpc_class(**entity))
 
def _process_descr(self, entity_name, action_name, grpc_method, grpc_class, entities) -> None:
def _process_descr(self, entity_name, action_name, grpc_method, grpc_class, entities) -> None:
num_ok, error_list = 0, []
num_ok, error_list = 0, []
for entity in entities:
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=self.__num_workers) as executor:
grpc_method(grpc_class(**entity))
future_to_entity = {
num_ok += 1
executor.submit(DescriptorLoader.worker, grpc_method, grpc_class, entity): (i, entity)
except Exception as e: # pylint: disable=broad-except
for i,entity in enumerate(entities)
error_list.append(f'{str(entity)}: {str(e)}')
}
 
 
for future in concurrent.futures.as_completed(future_to_entity):
 
i, entity = future_to_entity[future]
 
try:
 
_ = future.result()
 
num_ok += 1
 
except Exception as e: # pylint: disable=broad-except
 
error_list.append((i, f'{str(entity)}: {str(e)}'))
 
 
error_list = [str_error for _,str_error in sorted(error_list, key=operator.itemgetter(0))]
self.__results.append((entity_name, action_name, num_ok, error_list))
self.__results.append((entity_name, action_name, num_ok, error_list))
Loading