# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json, logging from typing import Any, Dict, List, Optional, Tuple, Union from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME from common.method_wrappers.ServiceExceptions import InvalidArgumentException from common.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig, Link from common.proto.device_pb2 import MonitoringSettings from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.tools.grpc.ConfigRules import update_config_rule_custom from common.tools.grpc.Tools import grpc_message_to_json from context.client.ContextClient import ContextClient from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS from .monitoring.MonitoringLoops import MonitoringLoops from .ErrorMessages import ( ERROR_BAD_RESOURCE, ERROR_DELETE, ERROR_GET, ERROR_GET_INIT, ERROR_MISSING_KPI, ERROR_SAMPLETYPE, ERROR_SET, ERROR_SUBSCRIBE, ERROR_UNSUBSCRIBE, ERROR_UNSUP_RESOURCE ) LOGGER = logging.getLogger(__name__) def check_connect_rules(device_config : DeviceConfig) -> Dict[str, Any]: connection_config_rules = dict() unexpected_config_rules = list() for config_rule in device_config.config_rules: is_action_set = (config_rule.action == ConfigActionEnum.CONFIGACTION_SET) is_custom_rule = (config_rule.WhichOneof('config_rule') == 'custom') if is_action_set and is_custom_rule and (config_rule.custom.resource_key.startswith('_connect/')): connect_attribute = config_rule.custom.resource_key.replace('_connect/', '') connection_config_rules[connect_attribute] = config_rule.custom.resource_value else: unexpected_config_rules.append(config_rule) if len(unexpected_config_rules) > 0: unexpected_config_rules = grpc_message_to_json(device_config) unexpected_config_rules = unexpected_config_rules['config_rules'] unexpected_config_rules = list(filter( lambda cr: cr.get('custom', {})['resource_key'].replace('_connect/', '') not in connection_config_rules, unexpected_config_rules)) str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True) raise InvalidArgumentException( 'device.device_config.config_rules', str_unexpected_config_rules, extra_details='RPC method AddDevice only accepts connection Config Rules that should start '\ 'with "_connect/" tag. Others should be configured after adding the device.') return connection_config_rules def get_connect_rules(device_config : DeviceConfig) -> Dict[str, Any]: connect_rules = dict() for config_rule in device_config.config_rules: if config_rule.action != ConfigActionEnum.CONFIGACTION_SET: continue if config_rule.WhichOneof('config_rule') != 'custom': continue if not config_rule.custom.resource_key.startswith('_connect/'): continue connect_attribute = config_rule.custom.resource_key.replace('_connect/', '') connect_rules[connect_attribute] = config_rule.custom.resource_value return connect_rules def check_no_endpoints(device_endpoints) -> None: if len(device_endpoints) == 0: return unexpected_endpoints = [] for device_endpoint in device_endpoints: unexpected_endpoints.append(grpc_message_to_json(device_endpoint)) str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True) raise InvalidArgumentException( 'device.device_endpoints', str_unexpected_endpoints, extra_details='RPC method AddDevice does not accept Endpoints. Endpoints are discovered through '\ 'interrogation of the physical device.') def get_device_manager_uuid(device : Device) -> Optional[str]: for config_rule in device.device_config.config_rules: if config_rule.WhichOneof('config_rule') != 'custom': continue if config_rule.custom.resource_key != '_manager': continue device_manager_id = json.loads(config_rule.custom.resource_value) return device_manager_id['uuid'] return None def populate_endpoints( device : Device, driver : _Driver, monitoring_loops : MonitoringLoops, new_sub_devices : Dict[str, Device], new_sub_links : Dict[str, Link] ) -> List[str]: device_uuid = device.device_id.device_uuid.uuid device_name = device.name resources_to_get = [RESOURCE_ENDPOINTS] results_getconfig = driver.GetConfig(resources_to_get) LOGGER.debug('results_getconfig = {:s}'.format(str(results_getconfig))) # first quick pass to identify need of mgmt endpoints and links add_mgmt_port = False for resource_data in results_getconfig: if len(resource_data) != 2: continue resource_key, _ = resource_data if resource_key.startswith('/devices/device'): add_mgmt_port = True break if add_mgmt_port: # add mgmt port to main device device_mgmt_endpoint = device.device_endpoints.add() device_mgmt_endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME device_mgmt_endpoint.endpoint_id.topology_id.topology_uuid.uuid = DEFAULT_TOPOLOGY_NAME device_mgmt_endpoint.endpoint_id.device_id.device_uuid.uuid = device_uuid device_mgmt_endpoint.endpoint_id.endpoint_uuid.uuid = 'mgmt' device_mgmt_endpoint.name = 'mgmt' device_mgmt_endpoint.endpoint_type = 'mgmt' errors : List[str] = list() for resource_data in results_getconfig: if len(resource_data) != 2: errors.append(ERROR_BAD_RESOURCE.format(device_uuid=device_uuid, resource_data=str(resource_data))) continue resource_key, resource_value = resource_data if isinstance(resource_value, Exception): errors.append(ERROR_GET.format( device_uuid=device_uuid, resource_key=str(resource_key), error=str(resource_value))) continue if resource_value is None: continue if resource_key.startswith('/devices/device'): # create sub-device _sub_device_uuid = resource_value['uuid'] _sub_device = Device() _sub_device.device_id.device_uuid.uuid = _sub_device_uuid # pylint: disable=no-member _sub_device.name = resource_value['name'] _sub_device.device_type = resource_value['type'] _sub_device.device_operational_status = resource_value['status'] # Sub-devices should not have a driver assigned. Instead, they should have # a config rule specifying their manager. #_sub_device.device_drivers.extend(resource_value['drivers']) # pylint: disable=no-member manager_config_rule = _sub_device.device_config.config_rules.add() manager_config_rule.action = ConfigActionEnum.CONFIGACTION_SET manager_config_rule.custom.resource_key = '_manager' manager = {'uuid': device_uuid, 'name': device_name} manager_config_rule.custom.resource_value = json.dumps(manager, indent=0, sort_keys=True) new_sub_devices[_sub_device_uuid] = _sub_device # add mgmt port to sub-device _sub_device_mgmt_endpoint = _sub_device.device_endpoints.add() # pylint: disable=no-member _sub_device_mgmt_endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME _sub_device_mgmt_endpoint.endpoint_id.topology_id.topology_uuid.uuid = DEFAULT_TOPOLOGY_NAME _sub_device_mgmt_endpoint.endpoint_id.device_id.device_uuid.uuid = _sub_device_uuid _sub_device_mgmt_endpoint.endpoint_id.endpoint_uuid.uuid = 'mgmt' _sub_device_mgmt_endpoint.name = 'mgmt' _sub_device_mgmt_endpoint.endpoint_type = 'mgmt' # add mgmt link _mgmt_link_uuid = '{:s}/{:s}=={:s}/{:s}'.format(device_name, 'mgmt', _sub_device.name, 'mgmt') _mgmt_link = Link() _mgmt_link.link_id.link_uuid.uuid = _mgmt_link_uuid # pylint: disable=no-member _mgmt_link.name = _mgmt_link_uuid _mgmt_link.link_endpoint_ids.append(device_mgmt_endpoint.endpoint_id) # pylint: disable=no-member _mgmt_link.link_endpoint_ids.append(_sub_device_mgmt_endpoint.endpoint_id) # pylint: disable=no-member new_sub_links[_mgmt_link_uuid] = _mgmt_link elif resource_key.startswith('/endpoints/endpoint'): endpoint_uuid = resource_value['uuid'] _device_uuid = resource_value.get('device_uuid') endpoint_name = resource_value.get('name') if _device_uuid is None: # add endpoint to current device device_endpoint = device.device_endpoints.add() device_endpoint.endpoint_id.device_id.device_uuid.uuid = device_uuid else: # add endpoint to specified device device_endpoint = new_sub_devices[_device_uuid].device_endpoints.add() device_endpoint.endpoint_id.device_id.device_uuid.uuid = _device_uuid device_endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME device_endpoint.endpoint_id.topology_id.topology_uuid.uuid = DEFAULT_TOPOLOGY_NAME device_endpoint.endpoint_id.endpoint_uuid.uuid = endpoint_uuid if endpoint_name is not None: device_endpoint.name = endpoint_name device_endpoint.endpoint_type = resource_value.get('type', '-') sample_types : Dict[int, str] = resource_value.get('sample_types', {}) for kpi_sample_type, monitor_resource_key in sample_types.items(): device_endpoint.kpi_sample_types.append(kpi_sample_type) monitoring_loops.add_resource_key(device_uuid, endpoint_uuid, kpi_sample_type, monitor_resource_key) elif resource_key.startswith('/links/link'): # create sub-link _sub_link_uuid = resource_value['uuid'] _sub_link = Link() _sub_link.link_id.link_uuid.uuid = _sub_link_uuid # pylint: disable=no-member _sub_link.name = resource_value['name'] new_sub_links[_sub_link_uuid] = _sub_link for device_uuid,endpoint_uuid in resource_value['name']: _sub_link_endpoint_id = _sub_link.link_endpoint_ids.add() # pylint: disable=no-member _sub_link_endpoint_id.topology_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME _sub_link_endpoint_id.topology_id.topology_uuid.uuid = DEFAULT_TOPOLOGY_NAME _sub_link_endpoint_id.device_id.device_uuid.uuid = device_uuid _sub_link_endpoint_id.endpoint_uuid.uuid = endpoint_uuid else: errors.append(ERROR_UNSUP_RESOURCE.format(device_uuid=device_uuid, resource_data=str(resource_data))) continue return errors def populate_endpoint_monitoring_resources(device_with_uuids : Device, monitoring_loops : MonitoringLoops) -> None: device_uuid = device_with_uuids.device_id.device_uuid.uuid for endpoint in device_with_uuids.device_endpoints: endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid endpoint_name = endpoint.name kpi_sample_types = endpoint.kpi_sample_types for kpi_sample_type in kpi_sample_types: monitor_resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_uuid, kpi_sample_type) if monitor_resource_key is not None: continue monitor_resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_name, kpi_sample_type) if monitor_resource_key is None: continue monitoring_loops.add_resource_key(device_uuid, endpoint_uuid, kpi_sample_type, monitor_resource_key) def _raw_config_rules_to_grpc( device_uuid : str, device_config : DeviceConfig, error_template : str, config_action : ConfigActionEnum, raw_config_rules : List[Tuple[str, Union[Any, Exception, None]]] ) -> List[str]: errors : List[str] = list() for resource_key, resource_value in raw_config_rules: if isinstance(resource_value, Exception): errors.append(error_template.format( device_uuid=device_uuid, resource_key=str(resource_key), resource_value=str(resource_value), error=str(resource_value))) continue if resource_value is None: continue resource_value = json.loads(resource_value) if isinstance(resource_value, str) else resource_value resource_value = {field_name : (field_value, False) for field_name,field_value in resource_value.items()} update_config_rule_custom(device_config.config_rules, resource_key, resource_value, new_action=config_action) return errors def populate_config_rules(device : Device, driver : _Driver) -> List[str]: device_uuid = device.device_id.device_uuid.uuid results_getconfig = driver.GetConfig() return _raw_config_rules_to_grpc( device_uuid, device.device_config, ERROR_GET, ConfigActionEnum.CONFIGACTION_SET, results_getconfig) def populate_initial_config_rules(device_uuid : str, device_config : DeviceConfig, driver : _Driver) -> List[str]: results_getinitconfig = driver.GetInitialConfig() return _raw_config_rules_to_grpc( device_uuid, device_config, ERROR_GET_INIT, ConfigActionEnum.CONFIGACTION_SET, results_getinitconfig) def compute_rules_to_add_delete( device : Device, request : Device ) -> Tuple[List[Tuple[str, Any]], List[Tuple[str, Any]]]: # convert config rules from context into a dictionary # TODO: add support for non-custom config rules context_config_rules = { config_rule.custom.resource_key: config_rule.custom.resource_value for config_rule in device.device_config.config_rules if config_rule.WhichOneof('config_rule') == 'custom' } # convert config rules from request into a list # TODO: add support for non-custom config rules request_config_rules = [ (config_rule.action, config_rule.custom.resource_key, config_rule.custom.resource_value) for config_rule in request.device_config.config_rules if config_rule.WhichOneof('config_rule') == 'custom' ] resources_to_set : List[Tuple[str, Any]] = [] # key, value resources_to_delete : List[Tuple[str, Any]] = [] # key, value for action, key, value in request_config_rules: if action == ConfigActionEnum.CONFIGACTION_SET: if (key in context_config_rules) and (context_config_rules[key][0] == value): continue resources_to_set.append((key, value)) elif action == ConfigActionEnum.CONFIGACTION_DELETE: if key not in context_config_rules: continue resources_to_delete.append((key, value)) return resources_to_set, resources_to_delete def configure_rules(device : Device, driver : _Driver, resources_to_set : List[Tuple[str, Any]]) -> List[str]: if len(resources_to_set) == 0: return [] results_setconfig = driver.SetConfig(resources_to_set) results_setconfig = [ (resource_key, result if isinstance(result, Exception) else resource_value) for (resource_key, resource_value), result in zip(resources_to_set, results_setconfig) ] device_uuid = device.device_id.device_uuid.uuid return _raw_config_rules_to_grpc( device_uuid, device.device_config, ERROR_SET, ConfigActionEnum.CONFIGACTION_SET, results_setconfig) def deconfigure_rules(device : Device, driver : _Driver, resources_to_delete : List[Tuple[str, Any]]) -> List[str]: if len(resources_to_delete) == 0: return [] results_deleteconfig = driver.DeleteConfig(resources_to_delete) results_deleteconfig = [ (resource_key, result if isinstance(result, Exception) else resource_value) for (resource_key, resource_value), result in zip(resources_to_delete, results_deleteconfig) ] device_uuid = device.device_id.device_uuid.uuid return _raw_config_rules_to_grpc( device_uuid, device.device_config, ERROR_DELETE, ConfigActionEnum.CONFIGACTION_DELETE, results_deleteconfig) def subscribe_kpi(request : MonitoringSettings, driver : _Driver, monitoring_loops : MonitoringLoops) -> List[str]: kpi_uuid = request.kpi_id.kpi_id.uuid device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid endpoint_uuid = request.kpi_descriptor.endpoint_id.endpoint_uuid.uuid kpi_sample_type = request.kpi_descriptor.kpi_sample_type resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_uuid, kpi_sample_type) if resource_key is None: kpi_sample_type_name = KpiSampleType.Name(kpi_sample_type).upper().replace('KPISAMPLETYPE_', '') MSG = ERROR_SAMPLETYPE.format( device_uuid=str(device_uuid), endpoint_uuid=str(endpoint_uuid), sample_type_id=str(kpi_sample_type), sample_type_name=str(kpi_sample_type_name) ) LOGGER.warning('{:s} Supported Device-Endpoint-KpiSampleType items: {:s}'.format( MSG, str(monitoring_loops.get_all_resource_keys()))) return [MSG] sampling_duration = request.sampling_duration_s # seconds sampling_interval = request.sampling_interval_s # seconds resources_to_subscribe = [(resource_key, sampling_duration, sampling_interval)] results_subscribestate = driver.SubscribeState(resources_to_subscribe) errors : List[str] = list() for (resource_key, duration, interval), result in zip(resources_to_subscribe, results_subscribestate): if isinstance(result, Exception): errors.append(ERROR_SUBSCRIBE.format( device_uuid=str(device_uuid), subscr_key=str(resource_key), subscr_duration=str(duration), subscr_interval=str(interval), error=str(result) )) continue monitoring_loops.add_kpi(device_uuid, resource_key, kpi_uuid, sampling_duration, sampling_interval) monitoring_loops.add_device(device_uuid, driver) return errors def unsubscribe_kpi(request : MonitoringSettings, driver : _Driver, monitoring_loops : MonitoringLoops) -> List[str]: kpi_uuid = request.kpi_id.kpi_id.uuid kpi_details = monitoring_loops.get_kpi_by_uuid(kpi_uuid) if kpi_details is None: return [ERROR_MISSING_KPI.format(kpi_uuid=str(kpi_uuid))] device_uuid, resource_key, sampling_duration, sampling_interval = kpi_details resources_to_unsubscribe = [(resource_key, sampling_duration, sampling_interval)] results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe) errors : List[str] = list() for (resource_key, duration, interval), result in zip(resources_to_unsubscribe, results_unsubscribestate): if isinstance(result, Exception): errors.append(ERROR_UNSUBSCRIBE.format( device_uuid=str(device_uuid), subscr_key=str(resource_key), subscr_duration=str(duration), subscr_interval=str(interval), error=str(result) )) continue monitoring_loops.remove_kpi(kpi_uuid) #monitoring_loops.remove_device(device_uuid) # Do not remove; one monitoring_loop/device used by multiple requests return errors def update_endpoints(src_device : Device, dst_device : Device) -> None: for src_endpoint in src_device.device_endpoints: src_device_uuid = src_endpoint.endpoint_id.device_id.device_uuid.uuid src_endpoint_uuid = src_endpoint.endpoint_id.endpoint_uuid.uuid src_context_uuid = src_endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid src_topology_uuid = src_endpoint.endpoint_id.topology_id.topology_uuid.uuid for dst_endpoint in dst_device.device_endpoints: dst_endpoint_id = dst_endpoint.endpoint_id if src_endpoint_uuid not in {dst_endpoint_id.endpoint_uuid.uuid, dst_endpoint.name}: continue if src_device_uuid != dst_endpoint_id.device_id.device_uuid.uuid: continue dst_topology_id = dst_endpoint_id.topology_id if len(src_topology_uuid) > 0 and src_topology_uuid != dst_topology_id.topology_uuid.uuid: continue if len(src_context_uuid) > 0 and src_context_uuid != dst_topology_id.context_id.context_uuid.uuid: continue break # found, do nothing else: # not found, add it dst_endpoint = dst_device.device_endpoints.add() # pylint: disable=no-member dst_endpoint_id = dst_endpoint.endpoint_id dst_endpoint_id.endpoint_uuid.uuid = src_endpoint_uuid dst_endpoint_id.device_id.device_uuid.uuid = src_device_uuid dst_topology_id = dst_endpoint_id.topology_id if len(src_topology_uuid) > 0: dst_topology_id.topology_uuid.uuid = src_topology_uuid if len(src_context_uuid) > 0: dst_topology_id.context_id.context_uuid.uuid = src_context_uuid