Commit 1a929de7 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'fix/load-devices-parallel' into 'release/2.0.1'

Common - Tool - DescriptorLoader:

See merge request !59
parents 1a724cc7 69f84c91
Loading
Loading
Loading
Loading
+27 −9
Original line number Original line Diff line number Diff line
@@ -31,8 +31,8 @@
#    for message,level in compose_notifications(results):
#    for message,level in compose_notifications(results):
#        loggers.get(level)(message)
#        loggers.get(level)(message)


import json
import concurrent.futures, json, logging, operator
from typing import Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Union
from common.proto.context_pb2 import Connection, Context, Device, Link, Service, Slice, Topology
from common.proto.context_pb2 import Connection, Context, Device, Link, Service, Slice, Topology
from context.client.ContextClient import ContextClient
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from device.client.DeviceClient import DeviceClient
@@ -43,6 +43,8 @@ from .Tools import (
    get_descriptors_add_contexts, get_descriptors_add_services, get_descriptors_add_slices,
    get_descriptors_add_contexts, get_descriptors_add_services, get_descriptors_add_slices,
    get_descriptors_add_topologies, split_devices_by_rules)
    get_descriptors_add_topologies, split_devices_by_rules)


LOGGER = logging.getLogger(__name__)

ENTITY_TO_TEXT = {
ENTITY_TO_TEXT = {
    # name   => singular,    plural
    # name   => singular,    plural
    'context'   : ('Context',    'Contexts'   ),
    'context'   : ('Context',    'Contexts'   ),
@@ -79,7 +81,7 @@ def compose_notifications(results : TypeResults) -> TypeNotificationList:


class DescriptorLoader:
class DescriptorLoader:
    def __init__(
    def __init__(
        self, descriptors : Union[str, Dict],
        self, descriptors : Union[str, Dict], num_workers : int = 1,
        context_client : Optional[ContextClient] = None, device_client : Optional[DeviceClient] = None,
        context_client : Optional[ContextClient] = None, device_client : Optional[DeviceClient] = None,
        service_client : Optional[ServiceClient] = None, slice_client : Optional[SliceClient] = None
        service_client : Optional[ServiceClient] = None, slice_client : Optional[SliceClient] = None
    ) -> None:
    ) -> None:
@@ -93,6 +95,8 @@ class DescriptorLoader:
        self.__slices      = self.__descriptors.get('slices'     , [])
        self.__slices      = self.__descriptors.get('slices'     , [])
        self.__connections = self.__descriptors.get('connections', [])
        self.__connections = self.__descriptors.get('connections', [])


        self.__num_workers = num_workers

        self.__contexts_add   = None
        self.__contexts_add   = None
        self.__topologies_add = None
        self.__topologies_add = None
        self.__devices_add    = None
        self.__devices_add    = None
@@ -242,12 +246,26 @@ class DescriptorLoader:
        #self.__dev_cli.close()
        #self.__dev_cli.close()
        #self.__ctx_cli.close()
        #self.__ctx_cli.close()


    @staticmethod
    def worker(grpc_method, grpc_class, entity) -> Any:
        return grpc_method(grpc_class(**entity))

    def _process_descr(self, entity_name, action_name, grpc_method, grpc_class, entities) -> None:
    def _process_descr(self, entity_name, action_name, grpc_method, grpc_class, entities) -> None:
        num_ok, error_list = 0, []
        num_ok, error_list = 0, []
        for entity in entities:

        with concurrent.futures.ThreadPoolExecutor(max_workers=self.__num_workers) as executor:
            future_to_entity = {
                executor.submit(DescriptorLoader.worker, grpc_method, grpc_class, entity): (i, entity)
                for i,entity in enumerate(entities)
            }

            for future in concurrent.futures.as_completed(future_to_entity):
                i, entity = future_to_entity[future]
                try:
                try:
                grpc_method(grpc_class(**entity))
                    _ = future.result()
                    num_ok += 1
                    num_ok += 1
                except Exception as e:  # pylint: disable=broad-except
                except Exception as e:  # pylint: disable=broad-except
                error_list.append(f'{str(entity)}: {str(e)}')
                    error_list.append((i, f'{str(entity)}: {str(e)}'))

        error_list = [str_error for _,str_error in sorted(error_list, key=operator.itemgetter(0))]
        self.__results.append((entity_name, action_name, num_ok, error_list))
        self.__results.append((entity_name, action_name, num_ok, error_list))
+3 −1
Original line number Original line Diff line number Diff line
@@ -34,6 +34,8 @@ slice_client = SliceClient()


LOGGER = logging.getLogger(__name__)
LOGGER = logging.getLogger(__name__)


DESCRIPTOR_LOADER_NUM_WORKERS = 10

def process_descriptors(descriptors):
def process_descriptors(descriptors):
    try:
    try:
        descriptors_file = request.files[descriptors.name]
        descriptors_file = request.files[descriptors.name]
@@ -43,7 +45,7 @@ def process_descriptors(descriptors):
        flash(f'Unable to load descriptor file: {str(e)}', 'danger')
        flash(f'Unable to load descriptor file: {str(e)}', 'danger')
        return
        return


    descriptor_loader = DescriptorLoader(descriptors)
    descriptor_loader = DescriptorLoader(descriptors, num_workers=DESCRIPTOR_LOADER_NUM_WORKERS)
    results = descriptor_loader.process()
    results = descriptor_loader.process()
    for message,level in compose_notifications(results):
    for message,level in compose_notifications(results):
        if level == 'error': LOGGER.warning('ERROR message={:s}'.format(str(message)))
        if level == 'error': LOGGER.warning('ERROR message={:s}'.format(str(message)))