Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# SDN controller descriptor loader
# Usage example (WebUI):
# descriptors = json.loads(descriptors_data_from_client)
# descriptor_loader = DescriptorLoader(descriptors)
# results = descriptor_loader.process()
# for message,level in compose_notifications(results):
# flash(message, level)
# Usage example (pytest):
# with open('path/to/descriptor.json', 'r', encoding='UTF-8') as f:
# descriptors = json.loads(f.read())
# descriptor_loader = DescriptorLoader(
# descriptors, context_client=..., device_client=..., service_client=..., slice_client=...)
# results = descriptor_loader.process()
# loggers = {'success': LOGGER.info, 'danger': LOGGER.error, 'error': LOGGER.error}
# for message,level in compose_notifications(results):
# loggers.get(level)(message)
import concurrent.futures, json, logging, operator
from typing import Any, Dict, List, Optional, Tuple, Union
from common.proto.context_pb2 import Connection, Context, Device, Link, Service, Slice, Topology
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from service.client.ServiceClient import ServiceClient
from slice.client.SliceClient import SliceClient
from .Tools import (
format_device_custom_config_rules, format_service_custom_config_rules, format_slice_custom_config_rules,
get_descriptors_add_contexts, get_descriptors_add_services, get_descriptors_add_slices,
get_descriptors_add_topologies, split_devices_by_rules)
LOGGER = logging.getLogger(__name__)
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
ENTITY_TO_TEXT = {
# name => singular, plural
'context' : ('Context', 'Contexts' ),
'topology' : ('Topology', 'Topologies' ),
'device' : ('Device', 'Devices' ),
'link' : ('Link', 'Links' ),
'service' : ('Service', 'Services' ),
'slice' : ('Slice', 'Slices' ),
'connection': ('Connection', 'Connections'),
}
ACTION_TO_TEXT = {
# action => infinitive, past
'add' : ('Add', 'Added'),
'update' : ('Update', 'Updated'),
'config' : ('Configure', 'Configured'),
}
TypeResults = List[Tuple[str, str, int, List[str]]] # entity_name, action, num_ok, list[error]
TypeNotification = Tuple[str, str] # message, level
TypeNotificationList = List[TypeNotification]
def compose_notifications(results : TypeResults) -> TypeNotificationList:
notifications = []
for entity_name, action_name, num_ok, error_list in results:
entity_name_singluar,entity_name_plural = ENTITY_TO_TEXT[entity_name]
action_infinitive, action_past = ACTION_TO_TEXT[action_name]
num_err = len(error_list)
for error in error_list:
notifications.append((f'Unable to {action_infinitive} {entity_name_singluar} {error}', 'error'))
if num_ok : notifications.append((f'{str(num_ok)} {entity_name_plural} {action_past}', 'success'))
if num_err: notifications.append((f'{str(num_err)} {entity_name_plural} failed', 'danger'))
return notifications
class DescriptorLoader:
def __init__(
self, descriptors : Union[str, Dict], num_workers : int = 1,
context_client : Optional[ContextClient] = None, device_client : Optional[DeviceClient] = None,
service_client : Optional[ServiceClient] = None, slice_client : Optional[SliceClient] = None
) -> None:
self.__descriptors = json.loads(descriptors) if isinstance(descriptors, str) else descriptors
self.__dummy_mode = self.__descriptors.get('dummy_mode' , False)
self.__contexts = self.__descriptors.get('contexts' , [])
self.__topologies = self.__descriptors.get('topologies' , [])
self.__devices = self.__descriptors.get('devices' , [])
self.__links = self.__descriptors.get('links' , [])
self.__services = self.__descriptors.get('services' , [])
self.__slices = self.__descriptors.get('slices' , [])
self.__connections = self.__descriptors.get('connections', [])
self.__num_workers = num_workers
self.__contexts_add = None
self.__topologies_add = None
self.__devices_add = None
self.__devices_config = None
self.__services_add = None
self.__slices_add = None
self.__ctx_cli = ContextClient() if context_client is None else context_client
self.__dev_cli = DeviceClient() if device_client is None else device_client
self.__svc_cli = ServiceClient() if service_client is None else service_client
self.__slc_cli = SliceClient() if slice_client is None else slice_client
@property
def contexts(self) -> List[Dict]: return self.__contexts
@property
def num_contexts(self) -> int: return len(self.__contexts)
@property
def topologies(self) -> Dict[str, List[Dict]]:
_topologies = {}
for topology in self.__topologies:
context_uuid = topology['topology_id']['context_id']['context_uuid']['uuid']
_topologies.setdefault(context_uuid, []).append(topology)
return _topologies
@property
def num_topologies(self) -> Dict[str, int]:
_num_topologies = {}
for topology in self.__topologies:
context_uuid = topology['topology_id']['context_id']['context_uuid']['uuid']
_num_topologies[context_uuid] = _num_topologies.get(context_uuid, 0) + 1
return _num_topologies
@property
def devices(self) -> List[Dict]: return self.__devices
@property
def num_devices(self) -> int: return len(self.__devices)
@property
def links(self) -> List[Dict]: return self.__links
@property
def num_links(self) -> int: return len(self.__links)
@property
def services(self) -> Dict[str, List[Dict]]:
_services = {}
for service in self.__services:
context_uuid = service['service_id']['context_id']['context_uuid']['uuid']
_services.setdefault(context_uuid, []).append(service)
return _services
@property
def num_services(self) -> Dict[str, int]:
_num_services = {}
for service in self.__services:
context_uuid = service['service_id']['context_id']['context_uuid']['uuid']
_num_services[context_uuid] = _num_services.get(context_uuid, 0) + 1
return _num_services
@property
def slices(self) -> Dict[str, List[Dict]]:
_slices = {}
for slice_ in self.__slices:
context_uuid = slice_['slice_id']['context_id']['context_uuid']['uuid']
_slices.setdefault(context_uuid, []).append(slice_)
return _slices
@property
def num_slices(self) -> Dict[str, int]:
_num_slices = {}
for slice_ in self.__slices:
context_uuid = slice_['slice_id']['context_id']['context_uuid']['uuid']
_num_slices[context_uuid] = _num_slices.get(context_uuid, 0) + 1
return _num_slices
@property
def connections(self) -> List[Dict]: return self.__connections
@property
def num_connections(self) -> int: return len(self.__connections)
# Format CustomConfigRules in Devices, Services and Slices provided in JSON format
self.__devices = [format_device_custom_config_rules (device ) for device in self.__devices ]
self.__services = [format_service_custom_config_rules(service) for service in self.__services]
self.__slices = [format_slice_custom_config_rules (slice_ ) for slice_ in self.__slices ]
# Context and Topology require to create the entity first, and add devices, links, services,
# slices, etc. in a second stage.
self.__contexts_add = get_descriptors_add_contexts(self.__contexts)
self.__topologies_add = get_descriptors_add_topologies(self.__topologies)
if self.__dummy_mode:
self._dummy_mode()
else:
self._normal_mode()
def _dummy_mode(self) -> None:
# Dummy Mode: used to pre-load databases (WebUI debugging purposes) with no smart or automated tasks.
self.__ctx_cli.connect()
self._process_descr('context', 'add', self.__ctx_cli.SetContext, Context, self.__contexts_add )
self._process_descr('topology', 'add', self.__ctx_cli.SetTopology, Topology, self.__topologies_add)
self._process_descr('device', 'add', self.__ctx_cli.SetDevice, Device, self.__devices )
self._process_descr('link', 'add', self.__ctx_cli.SetLink, Link, self.__links )
self._process_descr('service', 'add', self.__ctx_cli.SetService, Service, self.__services )
self._process_descr('slice', 'add', self.__ctx_cli.SetSlice, Slice, self.__slices )
self._process_descr('connection', 'add', self.__ctx_cli.SetConnection, Connection, self.__connections )
self._process_descr('context', 'update', self.__ctx_cli.SetContext, Context, self.__contexts )
self._process_descr('topology', 'update', self.__ctx_cli.SetTopology, Topology, self.__topologies )
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def _normal_mode(self) -> None:
# Normal mode: follows the automated workflows in the different components
assert len(self.__connections) == 0, 'in normal mode, connections should not be set'
# Device, Service and Slice require to first create the entity and the configure it
self.__devices_add, self.__devices_config = split_devices_by_rules(self.__devices)
self.__services_add = get_descriptors_add_services(self.__services)
self.__slices_add = get_descriptors_add_slices(self.__slices)
self.__ctx_cli.connect()
self.__dev_cli.connect()
self.__svc_cli.connect()
self.__slc_cli.connect()
self._process_descr('context', 'add', self.__ctx_cli.SetContext, Context, self.__contexts_add )
self._process_descr('topology', 'add', self.__ctx_cli.SetTopology, Topology, self.__topologies_add)
self._process_descr('device', 'add', self.__dev_cli.AddDevice, Device, self.__devices_add )
self._process_descr('device', 'config', self.__dev_cli.ConfigureDevice, Device, self.__devices_config)
self._process_descr('link', 'add', self.__ctx_cli.SetLink, Link, self.__links )
self._process_descr('service', 'add', self.__svc_cli.CreateService, Service, self.__services_add )
self._process_descr('service', 'update', self.__svc_cli.UpdateService, Service, self.__services )
self._process_descr('slice', 'add', self.__slc_cli.CreateSlice, Slice, self.__slices_add )
self._process_descr('slice', 'update', self.__slc_cli.UpdateSlice, Slice, self.__slices )
self._process_descr('context', 'update', self.__ctx_cli.SetContext, Context, self.__contexts )
self._process_descr('topology', 'update', self.__ctx_cli.SetTopology, Topology, self.__topologies )
#self.__slc_cli.close()
#self.__svc_cli.close()
#self.__dev_cli.close()
#self.__ctx_cli.close()
@staticmethod
def worker(grpc_method, grpc_class, entity) -> Any:
return grpc_method(grpc_class(**entity))
def _process_descr(self, entity_name, action_name, grpc_method, grpc_class, entities) -> None:
num_ok, error_list = 0, []
with concurrent.futures.ThreadPoolExecutor(max_workers=self.__num_workers) as executor:
future_to_entity = {
executor.submit(DescriptorLoader.worker, grpc_method, grpc_class, entity): (i, entity)
for i,entity in enumerate(entities)
}
for future in concurrent.futures.as_completed(future_to_entity):
i, entity = future_to_entity[future]
try:
_ = future.result()
num_ok += 1
except Exception as e: # pylint: disable=broad-except
error_list.append((i, f'{str(entity)}: {str(e)}'))
error_list = [str_error for _,str_error in sorted(error_list, key=operator.itemgetter(0))]
self.__results.append((entity_name, action_name, num_ok, error_list))