Skip to content
Snippets Groups Projects
DeviceServiceServicerImpl.py 12.6 KiB
Newer Older
from typing import Any, List, Tuple
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.orm.Database import Database
from common.orm.Factory import get_database_backend
from common.orm.HighLevel import get_object, update_or_create_object
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.orm.backend.BackendEnum import BackendEnum
from common.orm.backend.Tools import key_to_str
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, OperationFailedException
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.type_checkers.Checkers import chk_integer
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.client.ContextClient import ContextClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from device.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig, DeviceId, Empty
from device.proto.device_pb2 import MonitoringSettings
from device.proto.device_pb2_grpc import DeviceServiceServicer
from .MonitoringLoops import MonitoringLoops
from .database.ConfigModel import (
    ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw, update_config)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .database.DatabaseTools import (
    delete_device_from_context, get_device_driver_filter_fields, sync_device_from_context, sync_device_to_context,
    update_device_in_local_database)
from .database.DeviceModel import DeviceModel, DriverModel
from .database.EndPointModel import EndPointModel
from .database.KpiModel import KpiModel
from .database.KpiSampleType import grpc_to_enum__kpi_sample_type
from .driver_api._Driver import _Driver
from .driver_api.DriverInstanceCache import DriverInstanceCache
from .driver_api.Tools import (
    check_delete_errors, check_set_errors, check_subscribe_errors, check_unsubscribe_errors)
SERVICE_NAME = 'Device'
METHOD_NAMES = ['AddDevice', 'ConfigureDevice', 'DeleteDevice', 'GetInitialConfig', 'MonitorDeviceKpi']
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)

class DeviceServiceServicerImpl(DeviceServiceServicer):
    def __init__(
        self, context_client : ContextClient, driver_instance_cache : DriverInstanceCache,
        monitoring_loops : MonitoringLoops):

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.context_client = context_client
        self.database = Database(get_database_backend(backend=BackendEnum.INMEMORY))
        self.driver_instance_cache = driver_instance_cache
        self.monitoring_loops = monitoring_loops
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def AddDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
        device_id = request.device_id
        device_uuid = device_id.device_uuid.uuid

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        connection_config_rules = {}
        unexpected_config_rules = []
        for config_rule in request.device_config.config_rules:
            if (config_rule.action == ConfigActionEnum.CONFIGACTION_SET) and \
               (config_rule.resource_key.startswith('_connect/')):
               connection_config_rules[config_rule.resource_key.replace('_connect/', '')] = config_rule.resource_value
            else:
                unexpected_config_rules.append(config_rule)
        if len(unexpected_config_rules) > 0:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            raise InvalidArgumentException(
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                'device.device_config.config_rules', str(unexpected_config_rules),
                extra_details='RPC method AddDevice only accepts connection Config Rules that should start '\
                              'with "_connect/" tag. Others should be configured after adding the device.')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        sync_device_from_context(device_uuid, self.context_client, self.database)
        db_device,_ = update_device_in_local_database(self.database, request)

        driver_filter_fields = get_device_driver_filter_fields(db_device)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        address = connection_config_rules.pop('address', None)
        port    = connection_config_rules.pop('port', None)
        driver : _Driver = self.driver_instance_cache.get(
            device_uuid, filter_fields=driver_filter_fields, address=address, port=port,
            settings=connection_config_rules)
        driver.Connect()

        running_config_rules = driver.GetConfig()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        running_config_rules = [(ORM_ConfigActionEnum.SET, rule[0], rule[1]) for rule in running_config_rules]
        LOGGER.info('[AddDevice] running_config_rules = {:s}'.format(str(running_config_rules)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_config_rules = get_config_rules(self.database, device_uuid, 'running')
        LOGGER.info('[AddDevice] context_config_rules = {:s}'.format(str(context_config_rules)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # TODO: Compute diff between current context config and device config. The one in device is of higher priority
        # (might happen another instance is updating config and context was not already updated)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        update_config(self.database, device_uuid, 'running', running_config_rules)

        initial_config_rules = driver.GetInitialConfig()
        update_config(self.database, device_uuid, 'initial', initial_config_rules)

        sync_device_to_context(db_device, self.context_client)
        return DeviceId(**db_device.dump_id())

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
        device_id = request.device_id
        device_uuid = device_id.device_uuid.uuid

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        sync_device_from_context(device_uuid, self.context_client, self.database)

        context_config_rules = get_config_rules(self.database, device_uuid, 'running')
        context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules}
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules)))

        db_device,_ = update_device_in_local_database(self.database, request)

        request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
        LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules)))
        resources_to_set         : List[Tuple[str, Any]]          = [] # key, value
        resources_to_delete      : List[str]                      = [] # key
        resources_to_subscribe   : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
        resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
        for config_rule in request_config_rules:
            action, key, value = config_rule
            if action == ORM_ConfigActionEnum.SET:
                if (key not in context_config_rules) or (context_config_rules[key] != value):
                    resources_to_set.append((key, value))
            elif action == ORM_ConfigActionEnum.DELETE:
                if key in context_config_rules:
                    resources_to_delete.append(key)
        # TODO: Implement configuration of subscriptions
        # TODO: use of datastores (might be virtual ones) to enable rollbacks

        errors = []

        driver : _Driver = self.driver_instance_cache.get(device_uuid)
        if driver is None:
            errors.append('Device({:s}) has not been added to this Device instance'.format(str(device_uuid)))

        if len(errors) == 0:
            results_setconfig = driver.SetConfig(resources_to_set)
            errors.extend(check_set_errors(resources_to_set, results_setconfig))
        if len(errors) == 0:
            results_deleteconfig = driver.DeleteConfig(resources_to_delete)
            errors.extend(check_delete_errors(resources_to_delete, results_deleteconfig))

        if len(errors) == 0:
            results_subscribestate = driver.SubscribeState(resources_to_subscribe)
            errors.extend(check_subscribe_errors(resources_to_delete, results_subscribestate))

        if len(errors) == 0:
            results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
            errors.extend(check_unsubscribe_errors(resources_to_delete, results_unsubscribestate))

        if len(errors) > 0:
            raise OperationFailedException('ConfigureDevice', extra_details=errors)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        sync_device_to_context(db_device, self.context_client)
        return DeviceId(**db_device.dump_id())

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
        device_uuid = request.device_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        sync_device_from_context(device_uuid, self.context_client, self.database)
        db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
        if db_device is None: return Empty()

        self.driver_instance_cache.delete(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        delete_device_from_context(db_device, self.context_client)

        for db_endpoint_pk,_ in db_device.references(EndPointModel):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            EndPointModel(self.database, db_endpoint_pk).delete()

        for db_driver_pk,_ in db_device.references(DriverModel):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            DriverModel(self.database, db_driver_pk).delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_initial_config = ConfigModel(self.database, db_device.device_initial_config_fk)
        for db_config_rule_pk,_ in db_initial_config.references(ConfigRuleModel):
            ConfigRuleModel(self.database, db_config_rule_pk).delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_running_config = ConfigModel(self.database, db_device.device_running_config_fk)
        for db_config_rule_pk,_ in db_running_config.references(ConfigRuleModel):
            ConfigRuleModel(self.database, db_config_rule_pk).delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_device.delete()
        db_initial_config.delete()
        db_running_config.delete()
        return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetInitialConfig(self, request : DeviceId, context : grpc.ServicerContext) -> DeviceConfig:
        device_uuid = request.device_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        sync_device_from_context(device_uuid, self.context_client, self.database)
        db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)

        config_rules = {} if db_device is None else db_device.dump_initial_config()
        return DeviceConfig(config_rules=config_rules)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty:
        kpi_uuid = request.kpi_id.kpi_id.uuid

        device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)

        endpoint_id = request.kpi_descriptor.endpoint_id
        endpoint_uuid = endpoint_id.endpoint_uuid.uuid
        endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid
        if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
        str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
        endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
        endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid
        if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
            str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
            str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_endpoint : EndPointModel = get_object(
            self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #db_kpi_prev = get_object(self.database, KpiModel, kpi_uuid, raise_if_not_found=False)
        result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, {
            'kpi_uuid'         : request.kpi_id.kpi_id.uuid,
            'kpi_description'  : request.kpi_descriptor.kpi_description,
            'kpi_sample_type'  : grpc_to_enum__kpi_sample_type(request.kpi_descriptor.kpi_sample_type),
            'device_fk'        : db_device,
            'endpoint_fk'      : db_endpoint,
            'sampling_duration': request.sampling_duration_s,
            'sampling_interval': request.sampling_interval_s,
        })
        db_kpi, updated = result

        driver : _Driver = self.driver_instance_cache.get(device_uuid)
        if driver is None:
            msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
            raise OperationFailedException('ConfigureDevice', extra_details=msg)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        sampling_resource = driver.GetResource(db_endpoint.endpoint_uuid)

        results = driver.SubscribeState([
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            (sampling_resource, db_kpi.sampling_duration, db_kpi.sampling_interval),
        ])
        assert len(results) == 4
        for result in results: assert isinstance(result, bool) and result

        self.monitoring_loops.add(device_uuid, driver)