Loader.py 18.9 KB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# SDN controller descriptor loader

# Usage example (WebUI):
#    descriptors = json.loads(
#       descriptors=descriptors_data_from_client, num_workers=10,
#       context_client=..., device_client=..., service_client=..., slice_client=...)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    descriptor_loader = DescriptorLoader(descriptors)
#    results = descriptor_loader.process()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    for message,level in compose_notifications(results):
#        flash(message, level)

# Usage example (pytest):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    descriptor_loader = DescriptorLoader(
#       descriptors_file='path/to/descriptor.json', num_workers=10,
#       context_client=..., device_client=..., service_client=..., slice_client=...)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#    results = descriptor_loader.process()
#    check_results(results, descriptor_loader)
#    descriptor_loader.validate()
#    # do test ...
#    descriptor_loader.unload()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

import concurrent.futures, json, logging, operator
from typing import Any, Dict, List, Optional, Tuple, Union
from common.proto.context_pb2 import (
    Connection, Context, ContextId, Device, DeviceId, Empty, Link, LinkId, Service, ServiceId, Slice, SliceId,
    Topology, TopologyId)
from common.tools.object_factory.Context import json_context_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from service.client.ServiceClient import ServiceClient
from slice.client.SliceClient import SliceClient
from .Tools import (
    format_device_custom_config_rules, format_service_custom_config_rules, format_slice_custom_config_rules,
    get_descriptors_add_contexts, get_descriptors_add_services, get_descriptors_add_slices,
    get_descriptors_add_topologies, split_devices_by_rules)

LOGGER = logging.getLogger(__name__)
LOGGERS = {
    'success': LOGGER.info,
    'danger' : LOGGER.error,
    'error'  : LOGGER.error,
}
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
ENTITY_TO_TEXT = {
    # name   => singular,    plural
    'context'   : ('Context',    'Contexts'   ),
    'topology'  : ('Topology',   'Topologies' ),
    'device'    : ('Device',     'Devices'    ),
    'link'      : ('Link',       'Links'      ),
    'service'   : ('Service',    'Services'   ),
    'slice'     : ('Slice',      'Slices'     ),
    'connection': ('Connection', 'Connections'),
}

ACTION_TO_TEXT = {
    # action =>  infinitive,  past
    'add'     : ('Add',       'Added'),
    'update'  : ('Update',    'Updated'),
    'config'  : ('Configure', 'Configured'),
}

TypeResults = List[Tuple[str, str, int, List[str]]] # entity_name, action, num_ok, list[error]
TypeNotification = Tuple[str, str] # message, level
TypeNotificationList = List[TypeNotification]

class DescriptorLoader:
    def __init__(
        self, descriptors : Optional[Union[str, Dict]] = None, descriptors_file : Optional[str] = None,
        num_workers : int = 1,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_client : Optional[ContextClient] = None, device_client : Optional[DeviceClient] = None,
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        service_client : Optional[ServiceClient] = None, slice_client : Optional[SliceClient] = None
    ) -> None:
        if (descriptors is None) == (descriptors_file is None):
            # pylint: disable=broad-exception-raised
            raise Exception('Exactly one of "descriptors" or "descriptors_file" is required')
        
        if descriptors_file is not None:
            with open(descriptors_file, 'r', encoding='UTF-8') as f:
                self.__descriptors = json.loads(f.read())
            self.__descriptor_file_path = descriptors_file
        else: # descriptors is not None
            self.__descriptors = json.loads(descriptors) if isinstance(descriptors, str) else descriptors
            self.__descriptor_file_path = '<dict>'

        self.__num_workers = num_workers

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.__dummy_mode  = self.__descriptors.get('dummy_mode' , False)
        self.__contexts    = self.__descriptors.get('contexts'   , [])
        self.__topologies  = self.__descriptors.get('topologies' , [])
        self.__devices     = self.__descriptors.get('devices'    , [])
        self.__links       = self.__descriptors.get('links'      , [])
        self.__services    = self.__descriptors.get('services'   , [])
        self.__slices      = self.__descriptors.get('slices'     , [])
        self.__connections = self.__descriptors.get('connections', [])

        self.__contexts_add   = None
        self.__topologies_add = None
        self.__devices_add    = None
        self.__devices_config = None
        self.__services_add   = None
        self.__slices_add     = None

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.__ctx_cli = ContextClient() if context_client is None else context_client
        self.__dev_cli = DeviceClient()  if device_client  is None else device_client
        self.__svc_cli = ServiceClient() if service_client is None else service_client
        self.__slc_cli = SliceClient()   if slice_client   is None else slice_client
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.__results : TypeResults = list()

    @property
    def descriptor_file_path(self) -> Optional[str]: return self.__descriptor_file_path

    @property
    def num_workers(self) -> int: return self.__num_workers

    @property
    def context_client(self) -> Optional[ContextClient]: return self.__ctx_cli

    @property
    def device_client(self) -> Optional[DeviceClient]: return self.__dev_cli

    @property
    def service_client(self) -> Optional[ServiceClient]: return self.__svc_cli

    @property
    def slice_client(self) -> Optional[SliceClient]: return self.__slc_cli

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @property
    def contexts(self) -> List[Dict]: return self.__contexts

    @property
    def num_contexts(self) -> int: return len(self.__contexts)

    @property
    def topologies(self) -> Dict[str, List[Dict]]:
        _topologies = {}
        for topology in self.__topologies:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_uuid = topology['topology_id']['context_id']['context_uuid']['uuid']
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            _topologies.setdefault(context_uuid, []).append(topology)
        return _topologies

    @property
    def num_topologies(self) -> Dict[str, int]:
        _num_topologies = {}
        for topology in self.__topologies:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_uuid = topology['topology_id']['context_id']['context_uuid']['uuid']
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            _num_topologies[context_uuid] = _num_topologies.get(context_uuid, 0) + 1
        return _num_topologies

    @property
    def devices(self) -> List[Dict]: return self.__devices

    @property
    def num_devices(self) -> int: return len(self.__devices)

    @property
    def links(self) -> List[Dict]: return self.__links

    @property
    def num_links(self) -> int: return len(self.__links)

    @property
    def services(self) -> Dict[str, List[Dict]]:
        _services = {}
        for service in self.__services:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_uuid = service['service_id']['context_id']['context_uuid']['uuid']
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            _services.setdefault(context_uuid, []).append(service)
        return _services

    @property
    def num_services(self) -> Dict[str, int]:
        _num_services = {}
        for service in self.__services:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_uuid = service['service_id']['context_id']['context_uuid']['uuid']
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            _num_services[context_uuid] = _num_services.get(context_uuid, 0) + 1
        return _num_services

    @property
    def slices(self) -> Dict[str, List[Dict]]:
        _slices = {}
        for slice_ in self.__slices:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_uuid = slice_['slice_id']['context_id']['context_uuid']['uuid']
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            _slices.setdefault(context_uuid, []).append(slice_)
        return _slices

    @property
    def num_slices(self) -> Dict[str, int]:
        _num_slices = {}
        for slice_ in self.__slices:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_uuid = slice_['slice_id']['context_id']['context_uuid']['uuid']
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            _num_slices[context_uuid] = _num_slices.get(context_uuid, 0) + 1
        return _num_slices

    @property
    def connections(self) -> List[Dict]: return self.__connections
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @property
    def num_connections(self) -> int: return len(self.__connections)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def process(self) -> TypeResults:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # Format CustomConfigRules in Devices, Services and Slices provided in JSON format
        self.__devices  = [format_device_custom_config_rules (device ) for device  in self.__devices ]
        self.__services = [format_service_custom_config_rules(service) for service in self.__services]
        self.__slices   = [format_slice_custom_config_rules  (slice_ ) for slice_  in self.__slices  ]

        # Context and Topology require to create the entity first, and add devices, links, services,
        # slices, etc. in a second stage.
        self.__contexts_add = get_descriptors_add_contexts(self.__contexts)
        self.__topologies_add = get_descriptors_add_topologies(self.__topologies)

        if self.__dummy_mode:
            self._load_dummy_mode()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        else:
            self._load_normal_mode()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        
        return self.__results
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    def _load_dummy_mode(self) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # Dummy Mode: used to pre-load databases (WebUI debugging purposes) with no smart or automated tasks.
        self.__ctx_cli.connect()
        self._process_descr('context',    'add',    self.__ctx_cli.SetContext,    Context,    self.__contexts_add  )
        self._process_descr('topology',   'add',    self.__ctx_cli.SetTopology,   Topology,   self.__topologies_add)
        self._process_descr('device',     'add',    self.__ctx_cli.SetDevice,     Device,     self.__devices       )
        self._process_descr('link',       'add',    self.__ctx_cli.SetLink,       Link,       self.__links         )
        self._process_descr('service',    'add',    self.__ctx_cli.SetService,    Service,    self.__services      )
        self._process_descr('slice',      'add',    self.__ctx_cli.SetSlice,      Slice,      self.__slices        )
        self._process_descr('connection', 'add',    self.__ctx_cli.SetConnection, Connection, self.__connections   )

        # Update context and topology is useless:
        # - devices and links are assigned to topologies automatically by Context component
        # - topologies, services, and slices are assigned to contexts automatically by Context component
        #self._process_descr('context',    'update', self.__ctx_cli.SetContext,    Context,    self.__contexts      )
        #self._process_descr('topology',   'update', self.__ctx_cli.SetTopology,   Topology,   self.__topologies    )

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #self.__ctx_cli.close()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    def _load_normal_mode(self) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # Normal mode: follows the automated workflows in the different components
        assert len(self.__connections) == 0, 'in normal mode, connections should not be set'

        # Device, Service and Slice require to first create the entity and the configure it
        self.__devices_add, self.__devices_config = split_devices_by_rules(self.__devices)
        self.__services_add = get_descriptors_add_services(self.__services)
        self.__slices_add = get_descriptors_add_slices(self.__slices)

        self.__ctx_cli.connect()
        self.__dev_cli.connect()
        self.__svc_cli.connect()
        self.__slc_cli.connect()

        self._process_descr('context',  'add',    self.__ctx_cli.SetContext,      Context,  self.__contexts_add  )
        self._process_descr('topology', 'add',    self.__ctx_cli.SetTopology,     Topology, self.__topologies_add)
        self._process_descr('device',   'add',    self.__dev_cli.AddDevice,       Device,   self.__devices_add   )
        self._process_descr('device',   'config', self.__dev_cli.ConfigureDevice, Device,   self.__devices_config)
        self._process_descr('link',     'add',    self.__ctx_cli.SetLink,         Link,     self.__links         )
        self._process_descr('service',  'add',    self.__svc_cli.CreateService,   Service,  self.__services_add  )
        self._process_descr('service',  'update', self.__svc_cli.UpdateService,   Service,  self.__services      )
        self._process_descr('slice',    'add',    self.__slc_cli.CreateSlice,     Slice,    self.__slices_add    )
        self._process_descr('slice',    'update', self.__slc_cli.UpdateSlice,     Slice,    self.__slices        )
        
        # Update context and topology is useless:
        # - devices and links are assigned to topologies automatically by Context component
        # - topologies, services, and slices are assigned to contexts automatically by Context component
        #self._process_descr('context',  'update', self.__ctx_cli.SetContext,      Context,  self.__contexts      )
        #self._process_descr('topology', 'update', self.__ctx_cli.SetTopology,     Topology, self.__topologies    )
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #self.__slc_cli.close()
        #self.__svc_cli.close()
        #self.__dev_cli.close()
        #self.__ctx_cli.close()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    @staticmethod
    def worker(grpc_method, grpc_class, entity) -> Any:
        return grpc_method(grpc_class(**entity))

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def _process_descr(self, entity_name, action_name, grpc_method, grpc_class, entities) -> None:
        num_ok, error_list = 0, []

        with concurrent.futures.ThreadPoolExecutor(max_workers=self.__num_workers) as executor:
            future_to_entity = {
                executor.submit(DescriptorLoader.worker, grpc_method, grpc_class, entity): (i, entity)
                for i,entity in enumerate(entities)
            }

            for future in concurrent.futures.as_completed(future_to_entity):
                i, entity = future_to_entity[future]
                try:
                    _ = future.result()
                    num_ok += 1
                except Exception as e:  # pylint: disable=broad-except
                    error_list.append((i, f'{str(entity)}: {str(e)}'))

        error_list = [str_error for _,str_error in sorted(error_list, key=operator.itemgetter(0))]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.__results.append((entity_name, action_name, num_ok, error_list))

    def validate(self) -> None:
        self.__ctx_cli.connect()

        contexts = self.__ctx_cli.ListContexts(Empty())
        assert len(contexts.contexts) == self.num_contexts

        for context_uuid, num_topologies in self.num_topologies.items():
            response = self.__ctx_cli.ListTopologies(ContextId(**json_context_id(context_uuid)))
            assert len(response.topologies) == num_topologies

        response = self.__ctx_cli.ListDevices(Empty())
        assert len(response.devices) == self.num_devices

        response = self.__ctx_cli.ListLinks(Empty())
        assert len(response.links) == self.num_links

        for context_uuid, num_services in self.num_services.items():
            response = self.__ctx_cli.ListServices(ContextId(**json_context_id(context_uuid)))
            assert len(response.services) == num_services

        for context_uuid, num_slices in self.num_slices.items():
            response = self.__ctx_cli.ListSlices(ContextId(**json_context_id(context_uuid)))
            assert len(response.slices) == num_slices

    def _unload_dummy_mode(self) -> None:
        # Dummy Mode: used to pre-load databases (WebUI debugging purposes) with no smart or automated tasks.
        self.__ctx_cli.connect()

        for _, slice_list in self.slices.items():
            for slice_ in slice_list:
                self.__ctx_cli.RemoveSlice(SliceId(**slice_['slice_id']))

        for _, service_list in self.services.items():
            for service in service_list:
                self.__ctx_cli.RemoveService(ServiceId(**service['service_id']))

        for link in self.links:
            self.__ctx_cli.RemoveLink(LinkId(**link['link_id']))

        for device in self.devices:
            self.__ctx_cli.RemoveDevice(DeviceId(**device['device_id']))

        for _, topology_list in self.topologies.items():
            for topology in topology_list:
                self.__ctx_cli.RemoveTopology(TopologyId(**topology['topology_id']))

        for context in self.contexts:
            self.__ctx_cli.RemoveContext(ContextId(**context['context_id']))

        #self.__ctx_cli.close()

    def _unload_normal_mode(self) -> None:
        # Normal mode: follows the automated workflows in the different components
        self.__ctx_cli.connect()
        self.__dev_cli.connect()
        self.__svc_cli.connect()
        self.__slc_cli.connect()

        for _, slice_list in self.slices.items():
            for slice_ in slice_list:
                self.__slc_cli.DeleteSlice(SliceId(**slice_['slice_id']))

        for _, service_list in self.services.items():
            for service in service_list:
                self.__svc_cli.DeleteService(ServiceId(**service['service_id']))

        for link in self.links:
            self.__ctx_cli.RemoveLink(LinkId(**link['link_id']))

        for device in self.devices:
            self.__dev_cli.DeleteDevice(DeviceId(**device['device_id']))

        for _, topology_list in self.topologies.items():
            for topology in topology_list:
                self.__ctx_cli.RemoveTopology(TopologyId(**topology['topology_id']))

        for context in self.contexts:
            self.__ctx_cli.RemoveContext(ContextId(**context['context_id']))

        #self.__ctx_cli.close()
        #self.__dev_cli.close()
        #self.__svc_cli.close()
        #self.__slc_cli.close()

    def unload(self) -> None:
        if self.__dummy_mode:
            self._unload_dummy_mode()
        else:
            self._unload_normal_mode()

def compose_notifications(results : TypeResults) -> TypeNotificationList:
    notifications = []
    for entity_name, action_name, num_ok, error_list in results:
        entity_name_singluar,entity_name_plural = ENTITY_TO_TEXT[entity_name]
        action_infinitive, action_past = ACTION_TO_TEXT[action_name]
        num_err = len(error_list)
        for error in error_list:
            notifications.append((f'Unable to {action_infinitive} {entity_name_singluar} {error}', 'error'))
        if num_ok : notifications.append((f'{str(num_ok)} {entity_name_plural} {action_past}', 'success'))
        if num_err: notifications.append((f'{str(num_err)} {entity_name_plural} failed', 'danger'))
    return notifications

def check_descriptor_load_results(results : TypeResults, descriptor_loader : DescriptorLoader) -> None:
    num_errors = 0
    for message,level in compose_notifications(results):
        LOGGERS.get(level)(message)
        if level != 'success': num_errors += 1
    if num_errors > 0:
        MSG = 'Failed to load descriptors from "{:s}"'
        raise Exception(MSG.format(str(descriptor_loader.descriptor_file_path)))

def validate_empty_scenario(context_client : ContextClient) -> None:
    response = context_client.ListContexts(Empty())
    assert len(response.contexts) == 0

    response = context_client.ListDevices(Empty())
    assert len(response.devices) == 0

    response = context_client.ListLinks(Empty())
    assert len(response.links) == 0