Skip to content
Snippets Groups Projects
Select Git revision
  • 2d5608e75678905be2863e62249c06f2e5ec9599
  • master default
  • feat/331-bitnami-kafka-moved-to-bitnamilegacy-kafka
  • feat/330-tid-pcep-component
  • feat/tid-newer-pcep-component
  • feat/policy-refactor
  • feat/314-tid-new-service-for-ipowdm-configuration-fron-orchestrator-to-ipowdm-controller
  • feat/329-ecoc-2025-hack-your-research
  • develop protected
  • feat/116-ubi-updates-in-telemetry-backend-to-support-p4-in-band-network-telemetry
  • feat/320-cttc-ietf-simap-basic-support-with-kafka-yang-push
  • feat/310-cttc-implement-nbi-connector-to-interface-with-osm-client
  • feat/307-update-python-version-service
  • feat/292-cttc-implement-integration-test-for-ryu-openflow
  • cnit_tapi
  • feat/327-tid-new-service-to-ipowdm-controller-to-manage-transceivers-configuration-on-external-agent
  • cnit-p2mp-premerge
  • feat/325-tid-nbi-e2e-to-manage-e2e-path-computation
  • feat/326-tid-external-management-of-devices-telemetry-nbi
  • openroadm-flex-grid
  • CTTC-IMPLEMENT-NBI-CONNECTOR-NOS-ZTP
  • feat/324-tid-nbi-ietf_l3vpn-deploy-fail
  • v5.0.0 protected
  • v4.0.0 protected
  • demo-dpiab-eucnc2024
  • v3.0.0 protected
  • v2.1.0 protected
  • v2.0.0 protected
  • v1.0.0 protected
29 results

Objects.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    DeviceServiceServicerImpl.py 19.11 KiB
    # Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import grpc, logging, os, time
    from typing import Dict
    from prometheus_client import Histogram
    from common.Constants import ServiceNameEnum
    from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, get_env_var_name
    from common.method_wrappers.Decorator import MetricTypeEnum, MetricsPool, safe_and_metered_rpc_method
    from common.method_wrappers.ServiceExceptions import NotFoundException, OperationFailedException
    from common.proto.context_pb2 import (
        Device, DeviceConfig, DeviceDriverEnum, DeviceId, DeviceOperationalStatusEnum, Empty, Link,
        OpticalConfig, OpticalConfigId
    )
    from common.proto.device_pb2 import MonitoringSettings
    from common.proto.device_pb2_grpc import DeviceServiceServicer
    from common.tools.context_queries.Device import get_device
    from common.tools.mutex_queues.MutexQueues import MutexQueues
    from context.client.ContextClient import ContextClient
    from .driver_api._Driver import _Driver
    from .driver_api.DriverInstanceCache import DriverInstanceCache, get_driver
    from .monitoring.MonitoringLoops import MonitoringLoops
    from .ErrorMessages import ERROR_MISSING_DRIVER, ERROR_MISSING_KPI
    from .Tools import (
        check_connect_rules, check_no_endpoints, compute_rules_to_add_delete, configure_rules,
        deconfigure_rules, get_device_controller_uuid, populate_config_rules,
        populate_endpoint_monitoring_resources, populate_endpoints, populate_initial_config_rules,
        subscribe_kpi, unsubscribe_kpi, update_endpoints
    )
    
    LOGGER = logging.getLogger(__name__)
    
    METRICS_POOL = MetricsPool('Device', 'RPC')
    
    METRICS_POOL_DETAILS = MetricsPool('Device', 'execution', labels={
        'driver': '', 'operation': '', 'step': '',
    })
    
    class DeviceServiceServicerImpl(DeviceServiceServicer):
        def __init__(self, driver_instance_cache : DriverInstanceCache, monitoring_loops : MonitoringLoops) -> None:
            LOGGER.debug('Creating Servicer...')
            self.driver_instance_cache = driver_instance_cache
            self.monitoring_loops = monitoring_loops
            self.mutex_queues = MutexQueues()
            LOGGER.debug('Servicer Created')
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def AddDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
            t0 = time.time()
    
            device_uuid = request.device_id.device_uuid.uuid
    
            connection_config_rules = check_connect_rules(request.device_config)
            check_no_endpoints(request.device_endpoints)
    
            t1 = time.time()
    
            context_client = ContextClient()
            device = get_device(context_client, device_uuid, rw_copy=True)
            if device is None:
                # not in context, create blank one to get UUID, and populate it below
                device = Device()
                device.device_id.CopyFrom(request.device_id)            # pylint: disable=no-member
                device.name = request.name
                device.device_type = request.device_type
                device.device_operational_status = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED
                device.device_drivers.extend(request.device_drivers)    # pylint: disable=no-member
                device.device_config.CopyFrom(request.device_config)    # pylint: disable=no-member
    
                if request.HasField('controller_id'):
                    controller_id = request.controller_id
                    if controller_id.HasField('device_uuid'):
                        controller_device_uuid = controller_id.device_uuid.uuid
                        device.controller_id.device_uuid.uuid = controller_device_uuid
    
                device_id = context_client.SetDevice(device)
                device = get_device(context_client, device_id.device_uuid.uuid, rw_copy=True)
    
            # update device_uuid to honor UUID provided by Context
            device_uuid = device.device_id.device_uuid.uuid
            device_name = device.name
    
            t2 = time.time()
    
            self.mutex_queues.add_alias(device_uuid, device_name)
            self.mutex_queues.wait_my_turn(device_uuid)
            t3 = time.time()
            try:
                driver : _Driver = get_driver(self.driver_instance_cache, device)
    
                t4 = time.time()
    
                errors = []
    
                # Sub-devices and sub-links are exposed by intermediate controllers or represent mgmt links.
                # They are used to assist in path computation algorithms, and/or to identify dependencies
                # (which controller is in charge of which sub-device).
                new_sub_devices : Dict[str, Device] = dict()
                new_sub_links : Dict[str, Link] = dict()
                
                #----- Experimental ------------
                new_optical_configs : Dict[str, OpticalConfig] = dict()
    
                if len(device.device_endpoints) == 0:
                    t5 = time.time()
                    # created from request, populate endpoints using driver
                    errors.extend(populate_endpoints(
                        device, driver, self.monitoring_loops, new_sub_devices, new_sub_links,
                        new_optical_configs
                    ))
                    t6 = time.time()
                    t_pop_endpoints = t6 - t5
                else:
                    t_pop_endpoints = None
    
                is_optical_device = request.device_drivers[0] == DeviceDriverEnum.DEVICEDRIVER_OC
                if len(device.device_config.config_rules) == len(connection_config_rules) and not is_optical_device:
                    # created from request, populate config rules using driver
                    t7 = time.time()
                    errors.extend(populate_config_rules(device, driver))
                    t8 = time.time()
                    t_pop_config_rules = t8 - t7
                else:
                    t_pop_config_rules = None
    
                # TODO: populate components
    
                if len(errors) > 0:
                    for error in errors: LOGGER.error(error)
                    raise OperationFailedException('AddDevice', extra_details=errors)
    
                t9 = time.time()
    
                ztp_service_host = get_env_var_name(ServiceNameEnum.ZTP, ENVVAR_SUFIX_SERVICE_HOST)
                environment_variables = set(os.environ.keys())
                if ztp_service_host in environment_variables:
                    # ZTP component is deployed; leave devices disabled. ZTP will enable them.
                    device.device_operational_status = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED
                else:
                    # ZTP is not deployed; assume the device is ready while onboarding and set them as enabled.
                    device.device_operational_status = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED
    
                # temporary line
                if  is_optical_device:
                    #for endpoint in request.device_endpoints:
                    #    #endpoint.endpoint_id.device_id.CopyFrom(device.device_id)
                    #    pass
    
                    if 'new_optical_config' in new_optical_configs and 'opticalconfig' in new_optical_configs["new_optical_config"]:
                        context_client.SetOpticalConfig(new_optical_configs["new_optical_config"]['opticalconfig'])
    
                device_id = context_client.SetDevice(device)
    
                t10 = time.time()
    
                for sub_device in new_sub_devices.values():
                    context_client.SetDevice(sub_device)
    
                t11 = time.time()
    
                for sub_links in new_sub_links.values():
                    context_client.SetLink(sub_links)
    
                t12 = time.time()
    
                # Update endpoint monitoring resources with UUIDs
                device_with_uuids = get_device(
                    context_client, device_id.device_uuid.uuid, rw_copy=False, include_endpoints=True,
                    include_components=False, include_config_rules=False)
                populate_endpoint_monitoring_resources(device_with_uuids, self.monitoring_loops)
    
                t13 = time.time()
    
                context_client.close()
    
                t14 = time.time()
    
                metrics_labels = dict(driver=driver.name, operation='add_device')
    
                histogram_duration : Histogram = METRICS_POOL_DETAILS.get_or_create(
                    'details', MetricTypeEnum.HISTOGRAM_DURATION)
                histogram_duration.labels(step='total'              , **metrics_labels).observe(t14-t0)
                histogram_duration.labels(step='execution'          , **metrics_labels).observe(t14-t3)
                histogram_duration.labels(step='endpoint_checks'    , **metrics_labels).observe(t1-t0)
                histogram_duration.labels(step='get_device'         , **metrics_labels).observe(t2-t1)
                histogram_duration.labels(step='wait_queue'         , **metrics_labels).observe(t3-t2)
                histogram_duration.labels(step='get_driver'         , **metrics_labels).observe(t4-t3)
                histogram_duration.labels(step='set_device'         , **metrics_labels).observe(t10-t9)
                histogram_duration.labels(step='populate_monit_rsrc', **metrics_labels).observe(t13-t12)
    
                if t_pop_endpoints is not None:
                    histogram_duration.labels(step='populate_endpoints', **metrics_labels).observe(t_pop_endpoints)
    
                if t_pop_config_rules is not None:
                    histogram_duration.labels(step='populate_config_rules', **metrics_labels).observe(t_pop_config_rules)
    
                if len(new_sub_devices) > 0:
                    histogram_duration.labels(step='set_sub_devices', **metrics_labels).observe(t11-t10)
    
                if len(new_sub_links) > 0:
                    histogram_duration.labels(step='set_sub_links', **metrics_labels).observe(t12-t11)
    
                return device_id
            finally:
                self.mutex_queues.signal_done(device_uuid)
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
            t0 = time.time()
            device_id = request.device_id
            device_uuid = device_id.device_uuid.uuid
    
            self.mutex_queues.wait_my_turn(device_uuid)
            t1 = time.time()
            try:
                context_client = ContextClient()
                t2 = time.time()
                device = get_device(
                    context_client, device_uuid, rw_copy=True, include_endpoints=False, include_components=False,
                    include_config_rules=True)
                if device is None:
                    raise NotFoundException('Device', device_uuid, extra_details='loading in ConfigureDevice')
    
                t3 = time.time()
                device_controller_uuid = get_device_controller_uuid(device)
                if device_controller_uuid is not None:
                    device = get_device(
                        context_client, device_controller_uuid, rw_copy=True, include_endpoints=False,
                        include_components=False, include_config_rules=True)
                    if device is None:
                        raise NotFoundException(
                            'Device', device_controller_uuid, extra_details='loading in ConfigureDevice')
    
                device_uuid = device.device_id.device_uuid.uuid
                driver : _Driver = get_driver(self.driver_instance_cache, device)
                if driver is None:
                    msg = ERROR_MISSING_DRIVER.format(device_uuid=str(device_uuid))
                    raise OperationFailedException('ConfigureDevice', extra_details=msg)
    
                if DeviceDriverEnum.DEVICEDRIVER_P4 in device.device_drivers:
                    device = get_device(
                        context_client, device_uuid, rw_copy=False, include_endpoints=True, include_components=False,
                        include_config_rules=True)
                    # P4 Driver, by now, has no means to retrieve endpoints
                    # We allow defining the endpoints manually
                    update_endpoints(request, device)
    
                    # Update endpoints to get UUIDs
                    device_id = context_client.SetDevice(device)
                    device = context_client.GetDevice(device_id)
    
                ztp_service_host = get_env_var_name(ServiceNameEnum.ZTP, ENVVAR_SUFIX_SERVICE_HOST)
                environment_variables = set(os.environ.keys())
                if ztp_service_host in environment_variables:
                    # ZTP component is deployed; accept status updates
                    if request.device_operational_status != DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED:
                        device.device_operational_status = request.device_operational_status
                else:
                    # ZTP is not deployed; activated during AddDevice and not modified
                    pass
    
                t4 = time.time()
                # TODO: use of datastores (might be virtual ones) to enable rollbacks
                resources_to_set, resources_to_delete = compute_rules_to_add_delete(device, request)
    
                t5 = time.time()
                errors = []
                errors.extend(configure_rules(device, driver, resources_to_set))
                t6 = time.time()
                errors.extend(deconfigure_rules(device, driver, resources_to_delete))
    
                t7 = time.time()
                if len(errors) > 0:
                    for error in errors: LOGGER.error(error)
                    raise OperationFailedException('ConfigureDevice', extra_details=errors)
    
                # Context Performance+Scalability enhancement:
                # This method, besides P4 logic, does not add/update/delete endpoints.
                # Remove endpoints to reduce number of inserts done by Context.
                # TODO: Add logic to inspect endpoints and keep only those ones modified with respect to Context.
                del device.device_endpoints[:]
    
                t8 = time.time()
                # Note: Rules are updated by configure_rules() and deconfigure_rules() methods.
                device_id = context_client.SetDevice(device)
    
                t9 = time.time()
    
                metrics_labels = dict(driver=driver.name, operation='configure_device')
    
                histogram_duration : Histogram = METRICS_POOL_DETAILS.get_or_create(
                    'details', MetricTypeEnum.HISTOGRAM_DURATION)
                histogram_duration.labels(step='total'            , **metrics_labels).observe(t9-t0)
                histogram_duration.labels(step='wait_queue'       , **metrics_labels).observe(t1-t0)
                histogram_duration.labels(step='execution'        , **metrics_labels).observe(t9-t1)
                histogram_duration.labels(step='get_device'       , **metrics_labels).observe(t3-t2)
                histogram_duration.labels(step='split_rules'      , **metrics_labels).observe(t5-t4)
                histogram_duration.labels(step='configure_rules'  , **metrics_labels).observe(t6-t5)
                histogram_duration.labels(step='deconfigure_rules', **metrics_labels).observe(t7-t6)
                histogram_duration.labels(step='set_device'       , **metrics_labels).observe(t9-t8)
    
                return device_id
            finally:
                self.mutex_queues.signal_done(device_uuid)
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
            device_uuid = request.device_uuid.uuid
    
            self.mutex_queues.wait_my_turn(device_uuid)
            try:
                context_client = ContextClient()
                device = get_device(
                    context_client, device_uuid, rw_copy=False, include_endpoints=False, include_config_rules=False,
                    include_components=False)
                if device is None:
                    raise NotFoundException('Device', device_uuid, extra_details='loading in DeleteDevice')
                device_uuid = device.device_id.device_uuid.uuid
    
                self.monitoring_loops.remove_device(device_uuid)
                self.driver_instance_cache.delete(device_uuid)
                context_client.RemoveDevice(request)
                return Empty()
            finally:
                self.mutex_queues.signal_done(device_uuid)
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def GetInitialConfig(self, request : DeviceId, context : grpc.ServicerContext) -> DeviceConfig:
            device_uuid = request.device_uuid.uuid
    
            self.mutex_queues.wait_my_turn(device_uuid)
            try:
                context_client = ContextClient()
                device = get_device(
                    context_client, device_uuid, rw_copy=False, include_endpoints=False, include_components=False,
                    include_config_rules=True)
                if device is None:
                    raise NotFoundException('Device', device_uuid, extra_details='loading in DeleteDevice')
    
                driver : _Driver = get_driver(self.driver_instance_cache, device)
                if driver is None:
                    msg = ERROR_MISSING_DRIVER.format(device_uuid=str(device_uuid))
                    raise OperationFailedException('GetInitialConfig', extra_details=msg)
    
                device_config = DeviceConfig()
                errors = populate_initial_config_rules(device_uuid, device_config, driver)
    
                if len(errors) > 0:
                    for error in errors: LOGGER.error(error)
                    raise OperationFailedException('GetInitialConfig', extra_details=errors)
    
                return device_config
            finally:
                self.mutex_queues.signal_done(device_uuid)
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty:
            subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0)
            manage_kpi_method = subscribe_kpi if subscribe else unsubscribe_kpi
    
            if subscribe:
                device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
            else:
                # unsubscribe only carries kpi_uuid; take device_uuid from recorded KPIs
                kpi_uuid = request.kpi_id.kpi_id.uuid
                kpi_details = self.monitoring_loops.get_kpi_by_uuid(kpi_uuid)
                if kpi_details is None:
                    msg = ERROR_MISSING_KPI.format(kpi_uuid=str(kpi_uuid))
                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
                device_uuid = kpi_details[0]
    
            self.mutex_queues.wait_my_turn(device_uuid)
            try:
                context_client = ContextClient()
                device = get_device(
                    context_client, device_uuid, rw_copy=False, include_endpoints=False, include_components=False,
                    include_config_rules=True)
                if device is None:
                    raise NotFoundException('Device', device_uuid, extra_details='loading in DeleteDevice')
    
                driver : _Driver = get_driver(self.driver_instance_cache, device)
                if driver is None:
                    msg = ERROR_MISSING_DRIVER.format(device_uuid=str(device_uuid))
                    raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
    
                errors = manage_kpi_method(request, driver, self.monitoring_loops)
                if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)
    
                return Empty()
            finally:
                self.mutex_queues.signal_done(device_uuid)