Skip to content
Snippets Groups Projects
DeviceServiceServicerImpl.py 8.52 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import grpc, logging
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.ServiceExceptions import NotFoundException, OperationFailedException
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.context_pb2 import Device, DeviceConfig, DeviceId, DeviceOperationalStatusEnum, Empty
from common.proto.device_pb2 import MonitoringSettings
from common.proto.device_pb2_grpc import DeviceServiceServicer
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.context_queries.Device import get_device
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.mutex_queues.MutexQueues import MutexQueues
from context.client.ContextClient import ContextClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from device.service.Errors import ERROR_MISSING_DRIVER, ERROR_MISSING_KPI
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .driver_api._Driver import _Driver
from .driver_api.DriverInstanceCache import DriverInstanceCache, get_driver
from .monitoring.MonitoringLoops import MonitoringLoops
from .Tools import (
    check_connect_rules, check_no_endpoints, compute_rules_to_add_delete, configure_rules, deconfigure_rules,
    populate_config_rules, populate_endpoints, populate_initial_config_rules, subscribe_kpi, unsubscribe_kpi)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
METRICS_POOL = MetricsPool('Device', 'RPC')
class DeviceServiceServicerImpl(DeviceServiceServicer):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def __init__(self, driver_instance_cache : DriverInstanceCache, monitoring_loops : MonitoringLoops) -> None:
        self.driver_instance_cache = driver_instance_cache
        self.monitoring_loops = monitoring_loops
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues = MutexQueues()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def AddDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
        device_id = request.device_id
        device_uuid = device_id.device_uuid.uuid

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        connection_config_rules = check_connect_rules(request.device_config)
        check_no_endpoints(request.device_endpoints)

        context_client = ContextClient()
        device = get_device(context_client, device_uuid, rw_copy=True)
        if device is None:
            # not in context, create from request
            device = Device()
            device.CopyFrom(request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues.wait_my_turn(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            driver : _Driver = get_driver(self.driver_instance_cache, device)

            errors = []

            if len(device.device_endpoints) == 0:
                # created from request, populate endpoints using driver
                errors.extend(populate_endpoints(device, driver, self.monitoring_loops))

            if len(device.device_config.config_rules) == len(connection_config_rules):
                # created from request, populate config rules using driver
                errors.extend(populate_config_rules(device, driver))

            if len(errors) > 0:
                for error in errors: LOGGER.error(error)
                raise OperationFailedException('AddDevice', extra_details=errors)

            device_id = context_client.SetDevice(device)
            return device_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        finally:
            self.mutex_queues.signal_done(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
        device_id = request.device_id
        device_uuid = device_id.device_uuid.uuid

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues.wait_my_turn(device_uuid)
        try:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_client = ContextClient()
            device = get_device(context_client, device_uuid, rw_copy=True)
            if device is None:
                raise NotFoundException('Device', device_uuid, extra_details='loading in ConfigureDevice')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            driver : _Driver = self.driver_instance_cache.get(device_uuid)
            if driver is None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                msg = ERROR_MISSING_DRIVER.format(str(device_uuid))
                raise OperationFailedException('ConfigureDevice', extra_details=msg)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if request.device_operational_status != DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED:
                device.device_operational_status = request.device_operational_status

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # TODO: use of datastores (might be virtual ones) to enable rollbacks
            resources_to_set, resources_to_delete = compute_rules_to_add_delete(device, request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            errors = []
            errors.extend(configure_rules(device, driver, resources_to_set))
            errors.extend(deconfigure_rules(device, driver, resources_to_delete))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if len(errors) > 0:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                for error in errors: LOGGER.error(error)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                raise OperationFailedException('ConfigureDevice', extra_details=errors)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # Rules updated by configure_rules() and deconfigure_rules() methods.
            # Code to be removed soon if not needed.
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            del device.device_config.config_rules[:]
            populate_config_rules(device, driver)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

            device_id = context_client.SetDevice(device)
            return device_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        finally:
            self.mutex_queues.signal_done(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
        device_uuid = request.device_uuid.uuid

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues.wait_my_turn(device_uuid)
        try:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_client = ContextClient()
            self.monitoring_loops.remove_device(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            self.driver_instance_cache.delete(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_client.RemoveDevice(request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            return Empty()
        finally:
            self.mutex_queues.signal_done(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetInitialConfig(self, request : DeviceId, context : grpc.ServicerContext) -> DeviceConfig:
        device_uuid = request.device_uuid.uuid

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues.wait_my_turn(device_uuid)
        try:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            driver : _Driver = self.driver_instance_cache.get(device_uuid)
            if driver is None:
                msg = ERROR_MISSING_DRIVER.format(str(device_uuid))
                raise OperationFailedException('GetInitialConfig', extra_details=msg)

            device_config = DeviceConfig()
            errors = populate_initial_config_rules(device_uuid, device_config, driver)

            if len(errors) > 0:
                for error in errors: LOGGER.error(error)
                raise OperationFailedException('GetInitialConfig', extra_details=errors)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            return device_config
        finally:
            self.mutex_queues.signal_done(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0)
        manage_kpi_method = subscribe_kpi if subscribe else unsubscribe_kpi

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if subscribe:
            device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
        else:
            # unsubscribe only carries kpi_uuid; take device_uuid from recorded KPIs
            kpi_uuid = request.kpi_id.kpi_id.uuid
            kpi_details = self.monitoring_loops.get_kpi_by_uuid(kpi_uuid)
            if kpi_details is None:
                msg = ERROR_MISSING_KPI.format(str(kpi_uuid))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
            device_uuid = kpi_details[0]

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues.wait_my_turn(device_uuid)
        try:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            driver : _Driver = self.driver_instance_cache.get(device_uuid)
            if driver is None:
                msg = ERROR_MISSING_DRIVER.format(str(device_uuid))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

            errors = manage_kpi_method(request, driver, self.monitoring_loops)
            if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            return Empty()
        finally:
            self.mutex_queues.signal_done(device_uuid)