Skip to content
Snippets Groups Projects
Select Git revision
  • ededfa65e0f75f054570057a6a242f1371675a5e
  • master default
  • cnit_ofc26
  • feat/344-implement-a-new-firewall-agent-controllable-through-restconf-openconfig
  • feat/343-integration-of-mimir-deployment-in-production-environment
  • ofc_polimi
  • feat/305-cttc-enhanced-netconf-openconfig-sbi-driver-for-dscm-pluggables
  • feat/306-cttc-enhanced-restconf-based-openconfig-nbi-for-dscm-pluggables
  • feat/301-cttc-dscm-pluggables
  • CTTC-IMPLEMENT-NBI-CONNECTOR-NOS-ZTP
  • CTTC-TEST-SMARTNICS-6GMICROSDN-ZTP
  • develop protected
  • feat/327-tid-new-service-to-ipowdm-controller-to-manage-transceivers-configuration-on-external-agent
  • cnit_tapi
  • feat/330-tid-pcep-component
  • feat/tid-newer-pcep-component
  • feat/116-ubi-updates-in-telemetry-backend-to-support-p4-in-band-network-telemetry
  • feat/292-cttc-implement-integration-test-for-ryu-openflow
  • cnit-p2mp-premerge
  • feat/325-tid-nbi-e2e-to-manage-e2e-path-computation
  • feat/326-tid-external-management-of-devices-telemetry-nbi
  • feat/324-tid-nbi-ietf_l3vpn-deploy-fail
  • v5.0.0 protected
  • v4.0.0 protected
  • demo-dpiab-eucnc2024
  • v3.0.0 protected
  • v2.1.0 protected
  • v2.0.0 protected
  • v1.0.0 protected
29 results

CommonObjects.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    Tools.py 21.21 KiB
    # Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import json, logging
    from typing import Any, Dict, List, Optional, Tuple, Union
    from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
    from common.method_wrappers.ServiceExceptions import InvalidArgumentException
    from common.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig, Link
    from common.proto.device_pb2 import MonitoringSettings
    from common.proto.kpi_sample_types_pb2 import KpiSampleType
    from common.tools.grpc.ConfigRules import update_config_rule_custom
    from common.tools.grpc.Tools import grpc_message_to_json
    from context.client.ContextClient import ContextClient
    from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS
    from .monitoring.MonitoringLoops import MonitoringLoops
    from .ErrorMessages import (
        ERROR_BAD_RESOURCE, ERROR_DELETE, ERROR_GET, ERROR_GET_INIT, ERROR_MISSING_KPI, ERROR_SAMPLETYPE, ERROR_SET,
        ERROR_SUBSCRIBE, ERROR_UNSUBSCRIBE, ERROR_UNSUP_RESOURCE
    )
    
    LOGGER = logging.getLogger(__name__)
    
    def check_connect_rules(device_config : DeviceConfig) -> Dict[str, Any]:
        connection_config_rules = dict()
        unexpected_config_rules = list()
        for config_rule in device_config.config_rules:
            is_action_set = (config_rule.action == ConfigActionEnum.CONFIGACTION_SET)
            is_custom_rule = (config_rule.WhichOneof('config_rule') == 'custom')
            if is_action_set and is_custom_rule and (config_rule.custom.resource_key.startswith('_connect/')):
                connect_attribute = config_rule.custom.resource_key.replace('_connect/', '')
                connection_config_rules[connect_attribute] = config_rule.custom.resource_value
            else:
                unexpected_config_rules.append(config_rule)
    
        if len(unexpected_config_rules) > 0:
            unexpected_config_rules = grpc_message_to_json(device_config)
            unexpected_config_rules = unexpected_config_rules['config_rules']
            unexpected_config_rules = list(filter(
                lambda cr: cr.get('custom', {})['resource_key'].replace('_connect/', '') not in connection_config_rules,
                unexpected_config_rules))
            str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True)
            raise InvalidArgumentException(
                'device.device_config.config_rules', str_unexpected_config_rules,
                extra_details='RPC method AddDevice only accepts connection Config Rules that should start '\
                                'with "_connect/" tag. Others should be configured after adding the device.')
    
        return connection_config_rules
    
    def get_connect_rules(device_config : DeviceConfig) -> Dict[str, Any]:
        connect_rules = dict()
        for config_rule in device_config.config_rules:
            if config_rule.action != ConfigActionEnum.CONFIGACTION_SET: continue
            if config_rule.WhichOneof('config_rule') != 'custom': continue
            if not config_rule.custom.resource_key.startswith('_connect/'): continue
            connect_attribute = config_rule.custom.resource_key.replace('_connect/', '')
            connect_rules[connect_attribute] = config_rule.custom.resource_value
        return connect_rules
    
    def check_no_endpoints(device_endpoints) -> None:
        if len(device_endpoints) == 0: return
        unexpected_endpoints = []
        for device_endpoint in device_endpoints:
            unexpected_endpoints.append(grpc_message_to_json(device_endpoint))
        str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True)
        raise InvalidArgumentException(
            'device.device_endpoints', str_unexpected_endpoints,
            extra_details='RPC method AddDevice does not accept Endpoints. Endpoints are discovered through '\
                            'interrogation of the physical device.')
    
    def get_device_controller_uuid(device : Device) -> Optional[str]:
        for config_rule in device.device_config.config_rules:
            if config_rule.WhichOneof('config_rule') != 'custom': continue
            if config_rule.custom.resource_key != '_controller': continue
            device_controller_id = json.loads(config_rule.custom.resource_value)
            return device_controller_id['uuid']
        return None
    
    def populate_endpoints(
        device : Device, driver : _Driver, monitoring_loops : MonitoringLoops,
        new_sub_devices : Dict[str, Device], new_sub_links : Dict[str, Link]
    ) -> List[str]:
        device_uuid = device.device_id.device_uuid.uuid
        device_name = device.name
    
        resources_to_get = [RESOURCE_ENDPOINTS]
        results_getconfig = driver.GetConfig(resources_to_get)
        LOGGER.debug('results_getconfig = {:s}'.format(str(results_getconfig)))
    
        # first quick pass to identify need of mgmt endpoints and links
        add_mgmt_port = False
        for resource_data in results_getconfig:
            if len(resource_data) != 2: continue
            resource_key, _ = resource_data
            if resource_key.startswith('/devices/device'):
                add_mgmt_port = True
                break
    
        if add_mgmt_port:
            # add mgmt port to main device
            device_mgmt_endpoint = device.device_endpoints.add()
            device_mgmt_endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
            device_mgmt_endpoint.endpoint_id.topology_id.topology_uuid.uuid = DEFAULT_TOPOLOGY_NAME
            device_mgmt_endpoint.endpoint_id.device_id.device_uuid.uuid = device_uuid
            device_mgmt_endpoint.endpoint_id.endpoint_uuid.uuid = 'mgmt'
            device_mgmt_endpoint.name = 'mgmt'
            device_mgmt_endpoint.endpoint_type = 'mgmt'
    
        errors : List[str] = list()
        for resource_data in results_getconfig:
            if len(resource_data) != 2:
                errors.append(ERROR_BAD_RESOURCE.format(device_uuid=device_uuid, resource_data=str(resource_data)))
                continue
    
            resource_key, resource_value = resource_data
            if isinstance(resource_value, Exception):
                errors.append(ERROR_GET.format(
                    device_uuid=device_uuid, resource_key=str(resource_key), error=str(resource_value)))
                continue
            if resource_value is None:
                continue
    
            if resource_key.startswith('/devices/device'):
                # create sub-device
                _sub_device_uuid = resource_value['uuid']
                _sub_device = Device()
                _sub_device.device_id.device_uuid.uuid = _sub_device_uuid           # pylint: disable=no-member
                _sub_device.name = resource_value['name']
                _sub_device.device_type = resource_value['type']
                _sub_device.device_operational_status = resource_value['status']
                
                # Sub-devices should not have a driver assigned. Instead, they should have
                # a config rule specifying their controller.
                #_sub_device.device_drivers.extend(resource_value['drivers'])        # pylint: disable=no-member
                controller_config_rule = _sub_device.device_config.config_rules.add()
                controller_config_rule.action = ConfigActionEnum.CONFIGACTION_SET
                controller_config_rule.custom.resource_key = '_controller'
                controller = {'uuid': device_uuid, 'name': device_name}
                controller_config_rule.custom.resource_value = json.dumps(controller, indent=0, sort_keys=True)
    
                new_sub_devices[_sub_device_uuid] = _sub_device
    
                # add mgmt port to sub-device
                _sub_device_mgmt_endpoint = _sub_device.device_endpoints.add()      # pylint: disable=no-member
                _sub_device_mgmt_endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
                _sub_device_mgmt_endpoint.endpoint_id.topology_id.topology_uuid.uuid = DEFAULT_TOPOLOGY_NAME
                _sub_device_mgmt_endpoint.endpoint_id.device_id.device_uuid.uuid = _sub_device_uuid
                _sub_device_mgmt_endpoint.endpoint_id.endpoint_uuid.uuid = 'mgmt'
                _sub_device_mgmt_endpoint.name = 'mgmt'
                _sub_device_mgmt_endpoint.endpoint_type = 'mgmt'
    
                # add mgmt link
                _mgmt_link_uuid = '{:s}/{:s}=={:s}/{:s}'.format(device_name, 'mgmt', _sub_device.name, 'mgmt')
                _mgmt_link = Link()
                _mgmt_link.link_id.link_uuid.uuid = _mgmt_link_uuid                         # pylint: disable=no-member
                _mgmt_link.name = _mgmt_link_uuid
                _mgmt_link.link_endpoint_ids.append(device_mgmt_endpoint.endpoint_id)       # pylint: disable=no-member
                _mgmt_link.link_endpoint_ids.append(_sub_device_mgmt_endpoint.endpoint_id)  # pylint: disable=no-member
                new_sub_links[_mgmt_link_uuid] = _mgmt_link
    
            elif resource_key.startswith('/endpoints/endpoint'):
                endpoint_uuid = resource_value['uuid']
                _device_uuid = resource_value.get('device_uuid')
                endpoint_name = resource_value.get('name')
    
                if _device_uuid is None:
                    # add endpoint to current device
                    device_endpoint = device.device_endpoints.add()
                    device_endpoint.endpoint_id.device_id.device_uuid.uuid = device_uuid
                else:
                    # add endpoint to specified device
                    device_endpoint = new_sub_devices[_device_uuid].device_endpoints.add()
                    device_endpoint.endpoint_id.device_id.device_uuid.uuid = _device_uuid
    
                device_endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
                device_endpoint.endpoint_id.topology_id.topology_uuid.uuid = DEFAULT_TOPOLOGY_NAME
                
                device_endpoint.endpoint_id.endpoint_uuid.uuid = endpoint_uuid
                if endpoint_name is not None: device_endpoint.name = endpoint_name
                device_endpoint.endpoint_type = resource_value.get('type', '-')
    
                sample_types : Dict[int, str] = resource_value.get('sample_types', {})
                for kpi_sample_type, monitor_resource_key in sample_types.items():
                    device_endpoint.kpi_sample_types.append(kpi_sample_type)
                    monitoring_loops.add_resource_key(device_uuid, endpoint_uuid, kpi_sample_type, monitor_resource_key)
    
            elif resource_key.startswith('/links/link'):
                # create sub-link
                _sub_link_uuid = resource_value['uuid']
                _sub_link = Link()
                _sub_link.link_id.link_uuid.uuid = _sub_link_uuid           # pylint: disable=no-member
                _sub_link.name = resource_value['name']
                new_sub_links[_sub_link_uuid] = _sub_link
    
                for device_uuid,endpoint_uuid in resource_value['endpoints']:
                    _sub_link_endpoint_id = _sub_link.link_endpoint_ids.add()      # pylint: disable=no-member
                    _sub_link_endpoint_id.topology_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
                    _sub_link_endpoint_id.topology_id.topology_uuid.uuid = DEFAULT_TOPOLOGY_NAME
                    _sub_link_endpoint_id.device_id.device_uuid.uuid = device_uuid
                    _sub_link_endpoint_id.endpoint_uuid.uuid = endpoint_uuid
    
            else:
                errors.append(ERROR_UNSUP_RESOURCE.format(device_uuid=device_uuid, resource_data=str(resource_data)))
                continue
    
        return errors
    
    def populate_endpoint_monitoring_resources(device_with_uuids : Device, monitoring_loops : MonitoringLoops) -> None:
        device_uuid = device_with_uuids.device_id.device_uuid.uuid
    
        for endpoint in device_with_uuids.device_endpoints:
            endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
            endpoint_name = endpoint.name
            kpi_sample_types = endpoint.kpi_sample_types
            for kpi_sample_type in kpi_sample_types:
                monitor_resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_uuid, kpi_sample_type)
                if monitor_resource_key is not None: continue
    
                monitor_resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_name, kpi_sample_type)
                if monitor_resource_key is None: continue
                monitoring_loops.add_resource_key(device_uuid, endpoint_uuid, kpi_sample_type, monitor_resource_key)
    
    def _raw_config_rules_to_grpc(
        device_uuid : str, device_config : DeviceConfig, error_template : str, config_action : ConfigActionEnum,
        raw_config_rules : List[Tuple[str, Union[Any, Exception, None]]]
    ) -> List[str]:
        errors : List[str] = list()
    
        for resource_key, resource_value in raw_config_rules:
            if isinstance(resource_value, Exception):
                errors.append(error_template.format(
                    device_uuid=device_uuid, resource_key=str(resource_key), resource_value=str(resource_value),
                    error=str(resource_value)))
                continue
    
            if resource_value is None: continue
            resource_value = json.loads(resource_value) if isinstance(resource_value, str) else resource_value
            resource_value = {field_name : (field_value, False) for field_name,field_value in resource_value.items()}
            update_config_rule_custom(device_config.config_rules, resource_key, resource_value, new_action=config_action)
    
        return errors
    
    def populate_config_rules(device : Device, driver : _Driver) -> List[str]:
        device_uuid = device.device_id.device_uuid.uuid
        results_getconfig = driver.GetConfig()
        return _raw_config_rules_to_grpc(
            device_uuid, device.device_config, ERROR_GET, ConfigActionEnum.CONFIGACTION_SET, results_getconfig)
    
    def populate_initial_config_rules(device_uuid : str, device_config : DeviceConfig, driver : _Driver) -> List[str]:
        results_getinitconfig = driver.GetInitialConfig()
        return _raw_config_rules_to_grpc(
            device_uuid, device_config, ERROR_GET_INIT, ConfigActionEnum.CONFIGACTION_SET, results_getinitconfig)
    
    def compute_rules_to_add_delete(
        device : Device, request : Device
    ) -> Tuple[List[Tuple[str, Any]], List[Tuple[str, Any]]]:
        # convert config rules from context into a dictionary
        # TODO: add support for non-custom config rules
        context_config_rules = {
            config_rule.custom.resource_key: config_rule.custom.resource_value
            for config_rule in device.device_config.config_rules
            if config_rule.WhichOneof('config_rule') == 'custom'
        }
    
        # convert config rules from request into a list
        # TODO: add support for non-custom config rules
        request_config_rules = [
            (config_rule.action, config_rule.custom.resource_key, config_rule.custom.resource_value)
            for config_rule in request.device_config.config_rules
            if config_rule.WhichOneof('config_rule') == 'custom'
        ]
    
        resources_to_set    : List[Tuple[str, Any]] = [] # key, value
        resources_to_delete : List[Tuple[str, Any]] = [] # key, value
    
        for action, key, value in request_config_rules:
            if action == ConfigActionEnum.CONFIGACTION_SET:
                if (key in context_config_rules) and (context_config_rules[key][0] == value): continue
                resources_to_set.append((key, value))
            elif action == ConfigActionEnum.CONFIGACTION_DELETE:
                if key not in context_config_rules: continue
                resources_to_delete.append((key, value))
    
        return resources_to_set, resources_to_delete
    
    def configure_rules(device : Device, driver : _Driver, resources_to_set : List[Tuple[str, Any]]) -> List[str]:
        if len(resources_to_set) == 0: return []
    
        results_setconfig = driver.SetConfig(resources_to_set)
        results_setconfig = [
            (resource_key, result if isinstance(result, Exception) else resource_value)
            for (resource_key, resource_value), result in zip(resources_to_set, results_setconfig)
        ]
    
        device_uuid = device.device_id.device_uuid.uuid
        return _raw_config_rules_to_grpc(
            device_uuid, device.device_config, ERROR_SET, ConfigActionEnum.CONFIGACTION_SET, results_setconfig)
    
    def deconfigure_rules(device : Device, driver : _Driver, resources_to_delete : List[Tuple[str, Any]]) -> List[str]:
        if len(resources_to_delete) == 0: return []
    
        results_deleteconfig = driver.DeleteConfig(resources_to_delete)
        results_deleteconfig = [
            (resource_key, result if isinstance(result, Exception) else resource_value)
            for (resource_key, resource_value), result in zip(resources_to_delete, results_deleteconfig)
        ]
    
        device_uuid = device.device_id.device_uuid.uuid
        return _raw_config_rules_to_grpc(
            device_uuid, device.device_config, ERROR_DELETE, ConfigActionEnum.CONFIGACTION_DELETE, results_deleteconfig)
    
    def subscribe_kpi(request : MonitoringSettings, driver : _Driver, monitoring_loops : MonitoringLoops) -> List[str]:
        kpi_uuid = request.kpi_id.kpi_id.uuid
        device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
        endpoint_uuid = request.kpi_descriptor.endpoint_id.endpoint_uuid.uuid
        kpi_sample_type = request.kpi_descriptor.kpi_sample_type
    
        resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_uuid, kpi_sample_type)
        if resource_key is None:
            kpi_sample_type_name = KpiSampleType.Name(kpi_sample_type).upper().replace('KPISAMPLETYPE_', '')
            MSG = ERROR_SAMPLETYPE.format(
                device_uuid=str(device_uuid), endpoint_uuid=str(endpoint_uuid), sample_type_id=str(kpi_sample_type),
                sample_type_name=str(kpi_sample_type_name)
            )
            LOGGER.warning('{:s} Supported Device-Endpoint-KpiSampleType items: {:s}'.format(
                MSG, str(monitoring_loops.get_all_resource_keys())))
            return [MSG]
    
        sampling_duration = request.sampling_duration_s # seconds
        sampling_interval = request.sampling_interval_s # seconds
    
        resources_to_subscribe = [(resource_key, sampling_duration, sampling_interval)]
        results_subscribestate = driver.SubscribeState(resources_to_subscribe)
    
        errors : List[str] = list()
        for (resource_key, duration, interval), result in zip(resources_to_subscribe, results_subscribestate):
            if isinstance(result, Exception):
                errors.append(ERROR_SUBSCRIBE.format(
                    device_uuid=str(device_uuid), subscr_key=str(resource_key), subscr_duration=str(duration),
                    subscr_interval=str(interval), error=str(result)
                ))
                continue
    
        monitoring_loops.add_kpi(device_uuid, resource_key, kpi_uuid, sampling_duration, sampling_interval)
        monitoring_loops.add_device(device_uuid, driver)
    
        return errors
    
    def unsubscribe_kpi(request : MonitoringSettings, driver : _Driver, monitoring_loops : MonitoringLoops) -> List[str]:
        kpi_uuid = request.kpi_id.kpi_id.uuid    
    
        kpi_details = monitoring_loops.get_kpi_by_uuid(kpi_uuid)
        if kpi_details is None:
            return [ERROR_MISSING_KPI.format(kpi_uuid=str(kpi_uuid))]
    
        device_uuid, resource_key, sampling_duration, sampling_interval = kpi_details
    
        resources_to_unsubscribe = [(resource_key, sampling_duration, sampling_interval)]
        results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
    
        errors : List[str] = list()
        for (resource_key, duration, interval), result in zip(resources_to_unsubscribe, results_unsubscribestate):
            if isinstance(result, Exception):
                errors.append(ERROR_UNSUBSCRIBE.format(
                    device_uuid=str(device_uuid), subscr_key=str(resource_key), subscr_duration=str(duration),
                    subscr_interval=str(interval), error=str(result)
                ))
                continue
    
        monitoring_loops.remove_kpi(kpi_uuid)
        #monitoring_loops.remove_device(device_uuid) # Do not remove; one monitoring_loop/device used by multiple requests
    
        return errors
    
    def update_endpoints(src_device : Device, dst_device : Device) -> None:
        for src_endpoint in src_device.device_endpoints:
            src_device_uuid   = src_endpoint.endpoint_id.device_id.device_uuid.uuid
            src_endpoint_uuid = src_endpoint.endpoint_id.endpoint_uuid.uuid
            src_context_uuid  = src_endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid
            src_topology_uuid = src_endpoint.endpoint_id.topology_id.topology_uuid.uuid
    
            for dst_endpoint in dst_device.device_endpoints:
                dst_endpoint_id = dst_endpoint.endpoint_id
                if src_endpoint_uuid not in {dst_endpoint_id.endpoint_uuid.uuid, dst_endpoint.name}: continue
                if src_device_uuid != dst_endpoint_id.device_id.device_uuid.uuid: continue
    
                dst_topology_id = dst_endpoint_id.topology_id
                if len(src_topology_uuid) > 0 and src_topology_uuid != dst_topology_id.topology_uuid.uuid: continue
                if len(src_context_uuid) > 0 and src_context_uuid != dst_topology_id.context_id.context_uuid.uuid: continue
                break   # found, do nothing
            else:
                # not found, add it
                dst_endpoint = dst_device.device_endpoints.add()    # pylint: disable=no-member
                dst_endpoint_id = dst_endpoint.endpoint_id
                dst_endpoint_id.endpoint_uuid.uuid = src_endpoint_uuid
                dst_endpoint_id.device_id.device_uuid.uuid = src_device_uuid
                dst_topology_id = dst_endpoint_id.topology_id
                if len(src_topology_uuid) > 0: dst_topology_id.topology_uuid.uuid = src_topology_uuid
                if len(src_context_uuid) > 0: dst_topology_id.context_id.context_uuid.uuid = src_context_uuid