Skip to content
Snippets Groups Projects
DeviceServiceServicerImpl.py 19.1 KiB
Newer Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import grpc, logging, os, time
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Dict
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from prometheus_client import Histogram
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.Constants import ServiceNameEnum
from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, get_env_var_name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.Decorator import MetricTypeEnum, MetricsPool, safe_and_metered_rpc_method
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.ServiceExceptions import NotFoundException, OperationFailedException
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.context_pb2 import (
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    Device, DeviceConfig, DeviceDriverEnum, DeviceId, DeviceOperationalStatusEnum, Empty, Link,
    OpticalConfig, OpticalConfigId
)
from common.proto.device_pb2 import MonitoringSettings
from common.proto.device_pb2_grpc import DeviceServiceServicer
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.context_queries.Device import get_device
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.mutex_queues.MutexQueues import MutexQueues
from context.client.ContextClient import ContextClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .driver_api._Driver import _Driver
from .driver_api.DriverInstanceCache import DriverInstanceCache, get_driver
from .monitoring.MonitoringLoops import MonitoringLoops
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .ErrorMessages import ERROR_MISSING_DRIVER, ERROR_MISSING_KPI
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .Tools import (
    check_connect_rules, check_no_endpoints, compute_rules_to_add_delete, configure_rules,
    deconfigure_rules, get_device_controller_uuid, populate_config_rules,
    populate_endpoint_monitoring_resources, populate_endpoints, populate_initial_config_rules,
    subscribe_kpi, unsubscribe_kpi, update_endpoints
)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
METRICS_POOL = MetricsPool('Device', 'RPC')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
METRICS_POOL_DETAILS = MetricsPool('Device', 'execution', labels={
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    'driver': '', 'operation': '', 'step': '',
class DeviceServiceServicerImpl(DeviceServiceServicer):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def __init__(self, driver_instance_cache : DriverInstanceCache, monitoring_loops : MonitoringLoops) -> None:
        self.driver_instance_cache = driver_instance_cache
        self.monitoring_loops = monitoring_loops
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues = MutexQueues()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def AddDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        t0 = time.time()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        device_uuid = request.device_id.device_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        connection_config_rules = check_connect_rules(request.device_config)
        check_no_endpoints(request.device_endpoints)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        t1 = time.time()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_client = ContextClient()
        device = get_device(context_client, device_uuid, rw_copy=True)
        if device is None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # not in context, create blank one to get UUID, and populate it below
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device = Device()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device.device_id.CopyFrom(request.device_id)            # pylint: disable=no-member
            device.name = request.name
            device.device_type = request.device_type
            device.device_operational_status = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED
            device.device_drivers.extend(request.device_drivers)    # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device.device_config.CopyFrom(request.device_config)    # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

            if request.HasField('controller_id'):
                controller_id = request.controller_id
                if controller_id.HasField('device_uuid'):
                    controller_device_uuid = controller_id.device_uuid.uuid
                    device.controller_id.device_uuid.uuid = controller_device_uuid

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device_id = context_client.SetDevice(device)
            device = get_device(context_client, device_id.device_uuid.uuid, rw_copy=True)

        # update device_uuid to honor UUID provided by Context
        device_uuid = device.device_id.device_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        device_name = device.name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        t2 = time.time()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues.add_alias(device_uuid, device_name)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues.wait_my_turn(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        t3 = time.time()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            driver : _Driver = get_driver(self.driver_instance_cache, device)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            t4 = time.time()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            errors = []

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # Sub-devices and sub-links are exposed by intermediate controllers or represent mgmt links.
            # They are used to assist in path computation algorithms, and/or to identify dependencies
            # (which controller is in charge of which sub-device).
            new_sub_devices : Dict[str, Device] = dict()
            new_sub_links : Dict[str, Link] = dict()
            
            #----- Experimental ------------
            new_optical_configs : Dict[str, OpticalConfig] = dict()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if len(device.device_endpoints) == 0:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                t5 = time.time()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                # created from request, populate endpoints using driver
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                errors.extend(populate_endpoints(
                    device, driver, self.monitoring_loops, new_sub_devices, new_sub_links,
                    new_optical_configs
                ))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                t6 = time.time()
                t_pop_endpoints = t6 - t5
            else:
                t_pop_endpoints = None

            is_optical_device = request.device_drivers[0] == DeviceDriverEnum.DEVICEDRIVER_OC
            if len(device.device_config.config_rules) == len(connection_config_rules) and not is_optical_device:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                # created from request, populate config rules using driver
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                t7 = time.time()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                errors.extend(populate_config_rules(device, driver))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                t8 = time.time()
                t_pop_config_rules = t8 - t7
            else:
                t_pop_config_rules = None
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # TODO: populate components

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if len(errors) > 0:
                for error in errors: LOGGER.error(error)
                raise OperationFailedException('AddDevice', extra_details=errors)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            t9 = time.time()

            ztp_service_host = get_env_var_name(ServiceNameEnum.ZTP, ENVVAR_SUFIX_SERVICE_HOST)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            environment_variables = set(os.environ.keys())
            if ztp_service_host in environment_variables:
                # ZTP component is deployed; leave devices disabled. ZTP will enable them.
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                device.device_operational_status = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED
            else:
                # ZTP is not deployed; assume the device is ready while onboarding and set them as enabled.
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                device.device_operational_status = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED

            # temporary line
            if  is_optical_device:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                #for endpoint in request.device_endpoints:
                #    #endpoint.endpoint_id.device_id.CopyFrom(device.device_id)
                #    pass

                if 'new_optical_config' in new_optical_configs and 'opticalconfig' in new_optical_configs["new_optical_config"]:
                    context_client.SetOpticalConfig(new_optical_configs["new_optical_config"]['opticalconfig'])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device_id = context_client.SetDevice(device)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            t10 = time.time()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            for sub_device in new_sub_devices.values():
                context_client.SetDevice(sub_device)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            t11 = time.time()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            for sub_links in new_sub_links.values():
                context_client.SetLink(sub_links)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            t12 = time.time()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # Update endpoint monitoring resources with UUIDs
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device_with_uuids = get_device(
                context_client, device_id.device_uuid.uuid, rw_copy=False, include_endpoints=True,
                include_components=False, include_config_rules=False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            populate_endpoint_monitoring_resources(device_with_uuids, self.monitoring_loops)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            t13 = time.time()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_client.close()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

            t14 = time.time()

            metrics_labels = dict(driver=driver.name, operation='add_device')

            histogram_duration : Histogram = METRICS_POOL_DETAILS.get_or_create(
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                'details', MetricTypeEnum.HISTOGRAM_DURATION)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            histogram_duration.labels(step='total'              , **metrics_labels).observe(t14-t0)
            histogram_duration.labels(step='execution'          , **metrics_labels).observe(t14-t3)
            histogram_duration.labels(step='endpoint_checks'    , **metrics_labels).observe(t1-t0)
            histogram_duration.labels(step='get_device'         , **metrics_labels).observe(t2-t1)
            histogram_duration.labels(step='wait_queue'         , **metrics_labels).observe(t3-t2)
            histogram_duration.labels(step='get_driver'         , **metrics_labels).observe(t4-t3)
            histogram_duration.labels(step='set_device'         , **metrics_labels).observe(t10-t9)
            histogram_duration.labels(step='populate_monit_rsrc', **metrics_labels).observe(t13-t12)

            if t_pop_endpoints is not None:
                histogram_duration.labels(step='populate_endpoints', **metrics_labels).observe(t_pop_endpoints)

            if t_pop_config_rules is not None:
                histogram_duration.labels(step='populate_config_rules', **metrics_labels).observe(t_pop_config_rules)

            if len(new_sub_devices) > 0:
                histogram_duration.labels(step='set_sub_devices', **metrics_labels).observe(t11-t10)

            if len(new_sub_links) > 0:
                histogram_duration.labels(step='set_sub_links', **metrics_labels).observe(t12-t11)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            return device_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        finally:
            self.mutex_queues.signal_done(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        t0 = time.time()
        device_id = request.device_id
        device_uuid = device_id.device_uuid.uuid

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues.wait_my_turn(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        t1 = time.time()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        try:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            t2 = time.time()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device = get_device(
                context_client, device_uuid, rw_copy=True, include_endpoints=False, include_components=False,
                include_config_rules=True)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if device is None:
                raise NotFoundException('Device', device_uuid, extra_details='loading in ConfigureDevice')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            t3 = time.time()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device_controller_uuid = get_device_controller_uuid(device)
            if device_controller_uuid is not None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                device = get_device(
                    context_client, device_controller_uuid, rw_copy=True, include_endpoints=False,
                    include_components=False, include_config_rules=True)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                if device is None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                    raise NotFoundException(
                        'Device', device_controller_uuid, extra_details='loading in ConfigureDevice')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device_uuid = device.device_id.device_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            driver : _Driver = get_driver(self.driver_instance_cache, device)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if driver is None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                msg = ERROR_MISSING_DRIVER.format(device_uuid=str(device_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                raise OperationFailedException('ConfigureDevice', extra_details=msg)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if DeviceDriverEnum.DEVICEDRIVER_P4 in device.device_drivers:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                device = get_device(
                    context_client, device_uuid, rw_copy=False, include_endpoints=True, include_components=False,
                    include_config_rules=True)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                # P4 Driver, by now, has no means to retrieve endpoints
                # We allow defining the endpoints manually
                update_endpoints(request, device)

                # Update endpoints to get UUIDs
                device_id = context_client.SetDevice(device)
                device = context_client.GetDevice(device_id)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            ztp_service_host = get_env_var_name(ServiceNameEnum.ZTP, ENVVAR_SUFIX_SERVICE_HOST)
            environment_variables = set(os.environ.keys())
            if ztp_service_host in environment_variables:
                # ZTP component is deployed; accept status updates
                if request.device_operational_status != DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED:
                    device.device_operational_status = request.device_operational_status
            else:
                # ZTP is not deployed; activated during AddDevice and not modified
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                pass
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            t4 = time.time()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # TODO: use of datastores (might be virtual ones) to enable rollbacks
            resources_to_set, resources_to_delete = compute_rules_to_add_delete(device, request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            t5 = time.time()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            errors = []
            errors.extend(configure_rules(device, driver, resources_to_set))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            t6 = time.time()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            errors.extend(deconfigure_rules(device, driver, resources_to_delete))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            t7 = time.time()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if len(errors) > 0:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                for error in errors: LOGGER.error(error)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                raise OperationFailedException('ConfigureDevice', extra_details=errors)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # Context Performance+Scalability enhancement:
            # This method, besides P4 logic, does not add/update/delete endpoints.
            # Remove endpoints to reduce number of inserts done by Context.
            # TODO: Add logic to inspect endpoints and keep only those ones modified with respect to Context.
            del device.device_endpoints[:]

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            t8 = time.time()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # Note: Rules are updated by configure_rules() and deconfigure_rules() methods.
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device_id = context_client.SetDevice(device)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

            t9 = time.time()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            metrics_labels = dict(driver=driver.name, operation='configure_device')

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            histogram_duration : Histogram = METRICS_POOL_DETAILS.get_or_create(
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                'details', MetricTypeEnum.HISTOGRAM_DURATION)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            histogram_duration.labels(step='total'            , **metrics_labels).observe(t9-t0)
            histogram_duration.labels(step='wait_queue'       , **metrics_labels).observe(t1-t0)
            histogram_duration.labels(step='execution'        , **metrics_labels).observe(t9-t1)
            histogram_duration.labels(step='get_device'       , **metrics_labels).observe(t3-t2)
            histogram_duration.labels(step='split_rules'      , **metrics_labels).observe(t5-t4)
            histogram_duration.labels(step='configure_rules'  , **metrics_labels).observe(t6-t5)
            histogram_duration.labels(step='deconfigure_rules', **metrics_labels).observe(t7-t6)
            histogram_duration.labels(step='set_device'       , **metrics_labels).observe(t9-t8)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            return device_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        finally:
            self.mutex_queues.signal_done(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
        device_uuid = request.device_uuid.uuid

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues.wait_my_turn(device_uuid)
        try:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device = get_device(
                context_client, device_uuid, rw_copy=False, include_endpoints=False, include_config_rules=False,
                include_components=False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if device is None:
                raise NotFoundException('Device', device_uuid, extra_details='loading in DeleteDevice')
            device_uuid = device.device_id.device_uuid.uuid

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            self.monitoring_loops.remove_device(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            self.driver_instance_cache.delete(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_client.RemoveDevice(request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            return Empty()
        finally:
            self.mutex_queues.signal_done(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetInitialConfig(self, request : DeviceId, context : grpc.ServicerContext) -> DeviceConfig:
        device_uuid = request.device_uuid.uuid

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues.wait_my_turn(device_uuid)
        try:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device = get_device(
                context_client, device_uuid, rw_copy=False, include_endpoints=False, include_components=False,
                include_config_rules=True)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if device is None:
                raise NotFoundException('Device', device_uuid, extra_details='loading in DeleteDevice')

            driver : _Driver = get_driver(self.driver_instance_cache, device)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if driver is None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                msg = ERROR_MISSING_DRIVER.format(device_uuid=str(device_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                raise OperationFailedException('GetInitialConfig', extra_details=msg)

            device_config = DeviceConfig()
            errors = populate_initial_config_rules(device_uuid, device_config, driver)

            if len(errors) > 0:
                for error in errors: LOGGER.error(error)
                raise OperationFailedException('GetInitialConfig', extra_details=errors)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            return device_config
        finally:
            self.mutex_queues.signal_done(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0)
        manage_kpi_method = subscribe_kpi if subscribe else unsubscribe_kpi

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if subscribe:
            device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
        else:
            # unsubscribe only carries kpi_uuid; take device_uuid from recorded KPIs
            kpi_uuid = request.kpi_id.kpi_id.uuid
            kpi_details = self.monitoring_loops.get_kpi_by_uuid(kpi_uuid)
            if kpi_details is None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                msg = ERROR_MISSING_KPI.format(kpi_uuid=str(kpi_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
            device_uuid = kpi_details[0]

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues.wait_my_turn(device_uuid)
        try:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device = get_device(
                context_client, device_uuid, rw_copy=False, include_endpoints=False, include_components=False,
                include_config_rules=True)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if device is None:
                raise NotFoundException('Device', device_uuid, extra_details='loading in DeleteDevice')

            driver : _Driver = get_driver(self.driver_instance_cache, device)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if driver is None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                msg = ERROR_MISSING_DRIVER.format(device_uuid=str(device_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

            errors = manage_kpi_method(request, driver, self.monitoring_loops)
            if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            return Empty()
        finally:
            self.mutex_queues.signal_done(device_uuid)