Commit d7961f75 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Common - Descriptor Loader tool:

- added support for dictionary and file-based loading of descriptors
- added getter methods
- added validation and unload methods
- integrated helper methods
- updated documentation
parent ddf424f1
Loading
Loading
Loading
Loading

src/common/tests/LoadScenario.py

deleted100644 → 0
+0 −50
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from common.tools.descriptor.Loader import DescriptorLoader, compose_notifications
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from service.client.ServiceClient import ServiceClient
from slice.client.SliceClient import SliceClient

LOGGER = logging.getLogger(__name__)
LOGGERS = {
    'success': LOGGER.info,
    'danger' : LOGGER.error,
    'error'  : LOGGER.error,
}

def load_scenario_from_descriptor(
    descriptor_file : str, context_client : ContextClient, device_client : DeviceClient,
    service_client : ServiceClient, slice_client : SliceClient
) -> DescriptorLoader:
    with open(descriptor_file, 'r', encoding='UTF-8') as f:
        descriptors = f.read()

    descriptor_loader = DescriptorLoader(
        descriptors,
        context_client=context_client, device_client=device_client,
        service_client=service_client, slice_client=slice_client)
    results = descriptor_loader.process()

    num_errors = 0
    for message,level in compose_notifications(results):
        LOGGERS.get(level)(message)
        if level != 'success': num_errors += 1
    if num_errors > 0:
        MSG = 'Failed to load descriptors in file {:s}'
        raise Exception(MSG.format(str(descriptor_file)))

    return descriptor_loader
 No newline at end of file
+133 −24
Original line number Diff line number Diff line
@@ -15,25 +15,30 @@
# SDN controller descriptor loader

# Usage example (WebUI):
#    descriptors = json.loads(descriptors_data_from_client)
#    descriptors = json.loads(
#       descriptors=descriptors_data_from_client, num_workers=10,
#       context_client=..., device_client=..., service_client=..., slice_client=...)
#    descriptor_loader = DescriptorLoader(descriptors)
#    results = descriptor_loader.process()
#    for message,level in compose_notifications(results):
#        flash(message, level)

# Usage example (pytest):
#    with open('path/to/descriptor.json', 'r', encoding='UTF-8') as f:
#        descriptors = json.loads(f.read())
#    descriptor_loader = DescriptorLoader(
#       descriptors, context_client=..., device_client=..., service_client=..., slice_client=...)
#       descriptors_file='path/to/descriptor.json', num_workers=10,
#       context_client=..., device_client=..., service_client=..., slice_client=...)
#    results = descriptor_loader.process()
#    loggers = {'success': LOGGER.info, 'danger': LOGGER.error, 'error': LOGGER.error}
#    for message,level in compose_notifications(results):
#        loggers.get(level)(message)
#    check_results(results, descriptor_loader)
#    descriptor_loader.validate()
#    # do test ...
#    descriptor_loader.unload()

import concurrent.futures, json, logging, operator
from typing import Any, Dict, List, Optional, Tuple, Union
from common.proto.context_pb2 import Connection, Context, Device, Link, Service, Slice, Topology
from common.proto.context_pb2 import (
    Connection, Context, ContextId, Device, DeviceId, Empty, Link, LinkId, Service, ServiceId, Slice, SliceId,
    Topology, TopologyId)
from common.tools.object_factory.Context import json_context_id
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from service.client.ServiceClient import ServiceClient
@@ -44,6 +49,11 @@ from .Tools import (
    get_descriptors_add_topologies, split_devices_by_rules)

LOGGER = logging.getLogger(__name__)
LOGGERS = {
    'success': LOGGER.info,
    'danger' : LOGGER.error,
    'error'  : LOGGER.error,
}

ENTITY_TO_TEXT = {
    # name   => singular,    plural
@@ -67,25 +77,26 @@ TypeResults = List[Tuple[str, str, int, List[str]]] # entity_name, action, num_o
TypeNotification = Tuple[str, str] # message, level
TypeNotificationList = List[TypeNotification]

def compose_notifications(results : TypeResults) -> TypeNotificationList:
    notifications = []
    for entity_name, action_name, num_ok, error_list in results:
        entity_name_singluar,entity_name_plural = ENTITY_TO_TEXT[entity_name]
        action_infinitive, action_past = ACTION_TO_TEXT[action_name]
        num_err = len(error_list)
        for error in error_list:
            notifications.append((f'Unable to {action_infinitive} {entity_name_singluar} {error}', 'error'))
        if num_ok : notifications.append((f'{str(num_ok)} {entity_name_plural} {action_past}', 'success'))
        if num_err: notifications.append((f'{str(num_err)} {entity_name_plural} failed', 'danger'))
    return notifications

class DescriptorLoader:
    def __init__(
        self, descriptors : Union[str, Dict], num_workers : int = 1,
        self, descriptors : Optional[Union[str, Dict]] = None, descriptors_file : Optional[str] = None,
        num_workers : int = 1,
        context_client : Optional[ContextClient] = None, device_client : Optional[DeviceClient] = None,
        service_client : Optional[ServiceClient] = None, slice_client : Optional[SliceClient] = None
    ) -> None:
        if (descriptors is None) == (descriptors_file is None):
            raise Exception('Exactly one of "descriptors" or "descriptors_file" is required')
        
        if descriptors_file is not None:
            with open(descriptors_file, 'r', encoding='UTF-8') as f:
                self.__descriptors = json.loads(f.read())
            self.__descriptor_file_path = descriptors_file
        else: # descriptors is not None
            self.__descriptors = json.loads(descriptors) if isinstance(descriptors, str) else descriptors
            self.__descriptor_file_path = '<dict>'

        self.__num_workers = num_workers

        self.__dummy_mode  = self.__descriptors.get('dummy_mode' , False)
        self.__contexts    = self.__descriptors.get('contexts'   , [])
        self.__topologies  = self.__descriptors.get('topologies' , [])
@@ -95,8 +106,6 @@ class DescriptorLoader:
        self.__slices      = self.__descriptors.get('slices'     , [])
        self.__connections = self.__descriptors.get('connections', [])

        self.__num_workers = num_workers

        self.__contexts_add   = None
        self.__topologies_add = None
        self.__devices_add    = None
@@ -111,6 +120,24 @@ class DescriptorLoader:

        self.__results : TypeResults = list()

    @property
    def descriptor_file_path(self) -> Optional[str]: return self.__descriptor_file_path

    @property
    def num_workers(self) -> int: return self.__num_workers

    @property
    def context_client(self) -> Optional[ContextClient]: return self.__ctx_cli

    @property
    def device_client(self) -> Optional[DeviceClient]: return self.__dev_cli

    @property
    def service_client(self) -> Optional[ServiceClient]: return self.__svc_cli

    @property
    def slice_client(self) -> Optional[SliceClient]: return self.__slc_cli

    @property
    def contexts(self) -> List[Dict]: return self.__contexts

@@ -269,3 +296,85 @@ class DescriptorLoader:

        error_list = [str_error for _,str_error in sorted(error_list, key=operator.itemgetter(0))]
        self.__results.append((entity_name, action_name, num_ok, error_list))

    def validate(self) -> None:
        self.__ctx_cli.connect()

        contexts = self.__ctx_cli.ListContexts(Empty())
        assert len(contexts.contexts) == self.num_contexts

        for context_uuid, num_topologies in self.num_topologies.items():
            response = self.__ctx_cli.ListTopologies(ContextId(**json_context_id(context_uuid)))
            assert len(response.topologies) == num_topologies

        response = self.__ctx_cli.ListDevices(Empty())
        assert len(response.devices) == self.num_devices

        response = self.__ctx_cli.ListLinks(Empty())
        assert len(response.links) == self.num_links

        for context_uuid, num_services in self.num_services.items():
            response = self.__ctx_cli.ListServices(ContextId(**json_context_id(context_uuid)))
            assert len(response.services) == num_services

        for context_uuid, num_slices in self.num_slices.items():
            response = self.__ctx_cli.ListSlices(ContextId(**json_context_id(context_uuid)))
            assert len(response.slices) == num_slices

    def unload(self) -> None:
        self.__ctx_cli.connect()
        self.__dev_cli.connect()
        self.__svc_cli.connect()
        self.__slc_cli.connect()

        for _, slice_list in self.slices.items():
            for slice_ in slice_list:
                self.__slc_cli.DeleteSlice(SliceId(**slice_['slice_id']))

        for _, service_list in self.services.items():
            for service in service_list:
                self.__svc_cli.DeleteService(ServiceId(**service['service_id']))

        for link in self.links:
            self.__ctx_cli.RemoveLink(LinkId(**link['link_id']))

        for device in self.devices:
            self.__dev_cli.DeleteDevice(DeviceId(**device['device_id']))

        for _, topology_list in self.topologies.items():
            for topology in topology_list:
                self.__ctx_cli.RemoveTopology(TopologyId(**topology['topology_id']))

        for context in self.contexts:
            self.__ctx_cli.RemoveContext(ContextId(**context['context_id']))

def compose_notifications(results : TypeResults) -> TypeNotificationList:
    notifications = []
    for entity_name, action_name, num_ok, error_list in results:
        entity_name_singluar,entity_name_plural = ENTITY_TO_TEXT[entity_name]
        action_infinitive, action_past = ACTION_TO_TEXT[action_name]
        num_err = len(error_list)
        for error in error_list:
            notifications.append((f'Unable to {action_infinitive} {entity_name_singluar} {error}', 'error'))
        if num_ok : notifications.append((f'{str(num_ok)} {entity_name_plural} {action_past}', 'success'))
        if num_err: notifications.append((f'{str(num_err)} {entity_name_plural} failed', 'danger'))
    return notifications

def check_descriptor_load_results(results : TypeResults, descriptor_loader : DescriptorLoader) -> None:
    num_errors = 0
    for message,level in compose_notifications(results):
        LOGGERS.get(level)(message)
        if level != 'success': num_errors += 1
    if num_errors > 0:
        MSG = 'Failed to load descriptors from "{:s}"'
        raise Exception(MSG.format(str(descriptor_loader.descriptor_file_path)))

def validate_empty_scenario(context_client : ContextClient) -> None:
    response = context_client.ListContexts(Empty())
    assert len(response.contexts) == 0

    response = context_client.ListDevices(Empty())
    assert len(response.devices) == 0

    response = context_client.ListLinks(Empty())
    assert len(response.links) == 0