Skip to content
Snippets Groups Projects
Commit e1495512 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Common:

- new generic descriptor loader tool
- new scenario loader for automated tests
parent c7b10a86
No related branches found
No related tags found
2 merge requests!54Release 2.0.0,!27New Descriptor Loader Framework and OFC'22 code migrations
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging
from common.tools.descriptor.Loader import DescriptorLoader, compose_notifications
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from service.client.ServiceClient import ServiceClient
from slice.client.SliceClient import SliceClient
LOGGER = logging.getLogger(__name__)
LOGGERS = {
'success': LOGGER.info,
'danger' : LOGGER.error,
'error' : LOGGER.error,
}
def load_scenario_from_descriptor(
descriptor_file : str, context_client : ContextClient, device_client : DeviceClient,
service_client : ServiceClient, slice_client : SliceClient
) -> None:
with open(descriptor_file, 'r', encoding='UTF-8') as f:
descriptors = json.loads(f.read())
descriptor_loader = DescriptorLoader(
context_client=context_client, device_client=device_client,
service_client=service_client, slice_client=slice_client)
descriptor_loader.process_descriptors(descriptors)
results = descriptor_loader.get_results()
num_errors = 0
for message,level in compose_notifications(results):
LOGGERS.get(level)(message)
if level != 'success': num_errors += 1
if num_errors > 0:
MSG = 'Failed to load descriptors in file {:s}'
raise Exception(MSG.format(str(descriptor_file)))
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# SDN controller descriptor loader
# Usage example (WebUI):
# descriptors = json.loads(descriptors_data_from_client)
#
# descriptor_loader = DescriptorLoader()
# descriptor_loader.process_descriptors(descriptors)
# results = descriptor_loader.get_results()
# for message,level in compose_notifications(results):
# flash(message, level)
# Usage example (pytest):
# with open('path/to/descriptor.json', 'r', encoding='UTF-8') as f:
# descriptors = json.loads(f.read())
#
# descriptor_loader = DescriptorLoader()
# descriptor_loader.process_descriptors(descriptors)
# results = descriptor_loader.get_results()
# loggers = {'success': LOGGER.info, 'danger': LOGGER.error, 'error': LOGGER.error}
# for message,level in compose_notifications(results):
# loggers.get(level)(message)
from typing import Dict, List, Optional, Tuple
from common.proto.context_pb2 import Connection, Context, Device, Link, Service, Slice, Topology
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from service.client.ServiceClient import ServiceClient
from slice.client.SliceClient import SliceClient
from .Tools import (
format_device_custom_config_rules, format_service_custom_config_rules, format_slice_custom_config_rules,
get_descriptors_add_contexts, get_descriptors_add_services, get_descriptors_add_slices,
get_descriptors_add_topologies, split_devices_by_rules)
ENTITY_TO_TEXT = {
# name => singular, plural
'context' : ('Context', 'Contexts' ),
'topology' : ('Topology', 'Topologies' ),
'device' : ('Device', 'Devices' ),
'link' : ('Link', 'Links' ),
'service' : ('Service', 'Services' ),
'slice' : ('Slice', 'Slices' ),
'connection': ('Connection', 'Connections'),
}
ACTION_TO_TEXT = {
# action => infinitive, past
'add' : ('Add', 'Added'),
'update' : ('Update', 'Updated'),
'config' : ('Configure', 'Configured'),
}
TypeResults = List[Tuple[str, str, int, List[str]]] # entity_name, action, num_ok, list[error]
TypeNotification = Tuple[str, str] # message, level
TypeNotificationList = List[TypeNotification]
def compose_notifications(results : TypeResults) -> TypeNotificationList:
notifications = []
for entity_name, action_name, num_ok, error_list in results:
entity_name_singluar,entity_name_plural = ENTITY_TO_TEXT[entity_name]
action_infinitive, action_past = ACTION_TO_TEXT[action_name]
num_err = len(error_list)
for error in error_list:
notifications.append((f'Unable to {action_infinitive} {entity_name_singluar} {error}', 'error'))
if num_ok : notifications.append((f'{str(num_ok)} {entity_name_plural} {action_past}', 'success'))
if num_err: notifications.append((f'{str(num_err)} {entity_name_plural} failed', 'danger'))
return notifications
class DescriptorLoader:
def __init__(
self, context_client : Optional[ContextClient] = None, device_client : Optional[DeviceClient] = None,
service_client : Optional[ServiceClient] = None, slice_client : Optional[SliceClient] = None
) -> None:
self.__ctx_cli = ContextClient() if context_client is None else context_client
self.__dev_cli = DeviceClient() if device_client is None else device_client
self.__svc_cli = ServiceClient() if service_client is None else service_client
self.__slc_cli = SliceClient() if slice_client is None else slice_client
self.__results : TypeResults = list()
self.__connections = None
self.__contexts = None
self.__contexts_add = None
self.__devices = None
self.__devices_add = None
self.__devices_config = None
self.__dummy_mode = None
self.__links = None
self.__services = None
self.__services_add = None
self.__slices = None
self.__slices_add = None
self.__topologies = None
self.__topologies_add = None
def get_results(self) -> TypeResults: return self.__results
def process_descriptors(self, descriptors : Dict) -> None:
self.__dummy_mode = descriptors.get('dummy_mode' , False)
self.__contexts = descriptors.get('contexts' , [])
self.__topologies = descriptors.get('topologies' , [])
self.__devices = descriptors.get('devices' , [])
self.__links = descriptors.get('links' , [])
self.__services = descriptors.get('services' , [])
self.__slices = descriptors.get('slices' , [])
self.__connections = descriptors.get('connections', [])
# Format CustomConfigRules in Devices, Services and Slices provided in JSON format
self.__devices = [format_device_custom_config_rules (device ) for device in self.__devices ]
self.__services = [format_service_custom_config_rules(service) for service in self.__services]
self.__slices = [format_slice_custom_config_rules (slice_ ) for slice_ in self.__slices ]
# Context and Topology require to create the entity first, and add devices, links, services,
# slices, etc. in a second stage.
self.__contexts_add = get_descriptors_add_contexts(self.__contexts)
self.__topologies_add = get_descriptors_add_topologies(self.__topologies)
if self.__dummy_mode:
self._dummy_mode()
else:
self._normal_mode()
def _dummy_mode(self) -> None:
# Dummy Mode: used to pre-load databases (WebUI debugging purposes) with no smart or automated tasks.
self.__ctx_cli.connect()
self._process_descr('context', 'add', self.__ctx_cli.SetContext, Context, self.__contexts_add )
self._process_descr('topology', 'add', self.__ctx_cli.SetTopology, Topology, self.__topologies_add)
self._process_descr('device', 'add', self.__ctx_cli.SetDevice, Device, self.__devices )
self._process_descr('link', 'add', self.__ctx_cli.SetLink, Link, self.__links )
self._process_descr('service', 'add', self.__ctx_cli.SetService, Service, self.__services )
self._process_descr('slice', 'add', self.__ctx_cli.SetSlice, Slice, self.__slices )
self._process_descr('connection', 'add', self.__ctx_cli.SetConnection, Connection, self.__connections )
self._process_descr('context', 'update', self.__ctx_cli.SetContext, Context, self.__contexts )
self._process_descr('topology', 'update', self.__ctx_cli.SetTopology, Topology, self.__topologies )
self.__ctx_cli.close()
def _normal_mode(self) -> None:
# Normal mode: follows the automated workflows in the different components
assert len(self.__connections) == 0, 'in normal mode, connections should not be set'
# Device, Service and Slice require to first create the entity and the configure it
self.__devices_add, self.__devices_config = split_devices_by_rules(self.__devices)
self.__services_add = get_descriptors_add_services(self.__services)
self.__slices_add = get_descriptors_add_slices(self.__slices)
self.__ctx_cli.connect()
self.__dev_cli.connect()
self.__svc_cli.connect()
self.__slc_cli.connect()
self._process_descr('context', 'add', self.__ctx_cli.SetContext, Context, self.__contexts_add )
self._process_descr('topology', 'add', self.__ctx_cli.SetTopology, Topology, self.__topologies_add)
self._process_descr('device', 'add', self.__dev_cli.AddDevice, Device, self.__devices_add )
self._process_descr('device', 'config', self.__dev_cli.ConfigureDevice, Device, self.__devices_config)
self._process_descr('link', 'add', self.__ctx_cli.SetLink, Link, self.__links )
self._process_descr('service', 'add', self.__svc_cli.CreateService, Service, self.__services_add )
self._process_descr('service', 'update', self.__svc_cli.UpdateService, Service, self.__services )
self._process_descr('slice', 'add', self.__slc_cli.CreateSlice, Slice, self.__slices_add )
self._process_descr('slice', 'update', self.__slc_cli.UpdateSlice, Slice, self.__slices )
self._process_descr('context', 'update', self.__ctx_cli.SetContext, Context, self.__contexts )
self._process_descr('topology', 'update', self.__ctx_cli.SetTopology, Topology, self.__topologies )
self.__slc_cli.close()
self.__svc_cli.close()
self.__dev_cli.close()
self.__ctx_cli.close()
def _process_descr(self, entity_name, action_name, grpc_method, grpc_class, entities) -> None:
num_ok, error_list = 0, []
for entity in entities:
try:
grpc_method(grpc_class(**entity))
num_ok += 1
except Exception as e: # pylint: disable=broad-except
error_list.append(f'{str(entity)}: {str(e)}')
num_err += 1
self.__results.append((entity_name, action_name, num_ok, error_list))
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy, json
from typing import Dict, List, Optional, Tuple, Union
def get_descriptors_add_contexts(contexts : List[Dict]) -> List[Dict]:
contexts_add = copy.deepcopy(contexts)
for context in contexts_add:
context['topology_ids'] = []
context['service_ids'] = []
return contexts_add
def get_descriptors_add_topologies(topologies : List[Dict]) -> List[Dict]:
topologies_add = copy.deepcopy(topologies)
for topology in topologies_add:
topology['device_ids'] = []
topology['link_ids'] = []
return topologies_add
def get_descriptors_add_services(services : List[Dict]) -> List[Dict]:
services_add = []
for service in services:
service_copy = copy.deepcopy(service)
service_copy['service_endpoint_ids'] = []
service_copy['service_constraints'] = []
service_copy['service_config'] = {'config_rules': []}
services_add.append(service_copy)
return services_add
def get_descriptors_add_slices(slices : List[Dict]) -> List[Dict]:
slices_add = []
for slice_ in slices:
slice_copy = copy.deepcopy(slice_)
slice_copy['slice_endpoint_ids'] = []
slice_copy['slice_constraints'] = []
slice_copy['slice_config'] = {'config_rules': []}
slices_add.append(slice_copy)
return slices_add
TypeResourceValue = Union[str, int, bool, float, dict, list]
def format_custom_config_rules(config_rules : List[Dict]) -> List[Dict]:
for config_rule in config_rules:
if 'custom' not in config_rule: continue
custom_resource_value : TypeResourceValue = config_rule['custom']['resource_value']
if isinstance(custom_resource_value, (dict, list)):
custom_resource_value = json.dumps(custom_resource_value, sort_keys=True, indent=0)
config_rule['custom']['resource_value'] = custom_resource_value
return config_rules
def format_device_custom_config_rules(device : Dict) -> Dict:
config_rules = device.get('device_config', {}).get('config_rules', [])
config_rules = format_custom_config_rules(config_rules)
device['device_config']['config_rules'] = config_rules
return device
def format_service_custom_config_rules(service : Dict) -> Dict:
config_rules = service.get('service_config', {}).get('config_rules', [])
config_rules = format_custom_config_rules(config_rules)
service['service_config']['config_rules'] = config_rules
return service
def format_slice_custom_config_rules(slice_ : Dict) -> Dict:
config_rules = slice_.get('service_config', {}).get('config_rules', [])
config_rules = format_custom_config_rules(config_rules)
slice_['service_config']['config_rules'] = config_rules
return slice_
def split_devices_by_rules(devices : List[Dict]) -> Tuple[List[Dict], List[Dict]]:
devices_add = []
devices_config = []
for device in devices:
connect_rules = []
config_rules = []
for config_rule in device.get('device_config', {}).get('config_rules', []):
custom_resource_key : Optional[str] = config_rule.get('custom', {}).get('resource_key')
if custom_resource_key is not None and custom_resource_key.startswith('_connect/'):
connect_rules.append(config_rule)
else:
config_rules.append(config_rule)
if len(connect_rules) > 0:
device_add = copy.deepcopy(device)
device_add['device_endpoints'] = []
device_add['device_config'] = {'config_rules': connect_rules}
devices_add.append(device_add)
if len(config_rules) > 0:
device['device_config'] = {'config_rules': config_rules}
devices_config.append(device)
return devices_add, devices_config
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment