Skip to content
Snippets Groups Projects
DeviceServiceServicerImpl.py 8.77 KiB
Newer Older
from typing import Any, List, Tuple
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.orm.Database import Database
from common.orm.Factory import get_database_backend
from common.orm.HighLevel import get_object
from common.orm.backend.BackendEnum import BackendEnum
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, OperationFailedException
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.client.ContextClient import ContextClient
from device.proto.context_pb2 import Device, DeviceConfig, DeviceId, Empty
from device.proto.device_pb2_grpc import DeviceServiceServicer
from device.service.driver_api.Tools import check_delete_errors, check_set_errors, check_subscribe_errors, check_unsubscribe_errors
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .database.ConfigModel import ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw, update_config
from .database.DeviceModel import DeviceModel, DriverModel
from .database.EndPointModel import EndPointModel
from .database.DatabaseTools import (
    delete_device_from_context, get_device_driver_filter_fields, sync_device_from_context, sync_device_to_context,
    update_device_in_local_database)
from .driver_api._Driver import _Driver
from .driver_api.DriverInstanceCache import DriverInstanceCache
SERVICE_NAME = 'Device'
METHOD_NAMES = ['AddDevice', 'ConfigureDevice', 'DeleteDevice', 'GetInitialConfig']
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)

class DeviceServiceServicerImpl(DeviceServiceServicer):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def __init__(self, context_client : ContextClient, driver_instance_cache : DriverInstanceCache):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.context_client = context_client
        self.database = Database(get_database_backend(backend=BackendEnum.INMEMORY))
        self.driver_instance_cache = driver_instance_cache
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def AddDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
        device_id = request.device_id
        device_uuid = device_id.device_uuid.uuid

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if len(request.device_config.config_rules) > 0:
            raise InvalidArgumentException(
                'device.device_config.config_rules', str(request.device_config.config_rules),
                extra_details='RPC method AddDevice does not allow definition of Config Rules. '\
                              'Add the Device first, and then configure it.')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        sync_device_from_context(device_uuid, self.context_client, self.database)
        db_device,_ = update_device_in_local_database(self.database, request)

        driver_filter_fields = get_device_driver_filter_fields(db_device)
        driver : _Driver = self.driver_instance_cache.get(device_uuid, driver_filter_fields)
        driver.Connect()

        running_config_rules = driver.GetConfig()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        running_config_rules = [(ORM_ConfigActionEnum.SET, rule[0], rule[1]) for rule in running_config_rules]
        LOGGER.info('[AddDevice] running_config_rules = {:s}'.format(str(running_config_rules)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_config_rules = get_config_rules(self.database, device_uuid, 'running')
        LOGGER.info('[AddDevice] context_config_rules = {:s}'.format(str(context_config_rules)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        # TODO: Compute diff between current context config and device config. The one in device is of higher priority
        # (might happen another instance is updating config and context was not already updated)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        update_config(self.database, device_uuid, 'running', running_config_rules)

        initial_config_rules = driver.GetInitialConfig()
        update_config(self.database, device_uuid, 'initial', initial_config_rules)

        sync_device_to_context(db_device, self.context_client)
        return DeviceId(**db_device.dump_id())

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
        device_id = request.device_id
        device_uuid = device_id.device_uuid.uuid

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        sync_device_from_context(device_uuid, self.context_client, self.database)

        context_config_rules = get_config_rules(self.database, device_uuid, 'running')
        context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules}
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules)))

        db_device,_ = update_device_in_local_database(self.database, request)

        request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
        LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules)))
        resources_to_set         : List[Tuple[str, Any]]          = [] # key, value
        resources_to_delete      : List[str]                      = [] # key
        resources_to_subscribe   : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
        resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
        for config_rule in request_config_rules:
            action, key, value = config_rule
            if action == ORM_ConfigActionEnum.SET:
                if (key not in context_config_rules) or (context_config_rules[key] != value):
                    resources_to_set.append((key, value))
            elif action == ORM_ConfigActionEnum.DELETE:
                if (key in context_config_rules):
                    resources_to_delete.append(key)
        # TODO: Implement configuration of subscriptions
        # TODO: use of datastores (might be virtual ones) to enable rollbacks

        errors = []

        driver : _Driver = self.driver_instance_cache.get(device_uuid)
        if driver is None:
            errors.append('Device({:s}) has not been added to this Device instance'.format(str(device_uuid)))

        if len(errors) == 0:
            results_setconfig = driver.SetConfig(resources_to_set)
            errors.extend(check_set_errors(resources_to_set, results_setconfig))
        if len(errors) == 0:
            results_deleteconfig = driver.DeleteConfig(resources_to_delete)
            errors.extend(check_delete_errors(resources_to_delete, results_deleteconfig))

        if len(errors) == 0:
            results_subscribestate = driver.SubscribeState(resources_to_subscribe)
            errors.extend(check_subscribe_errors(resources_to_delete, results_subscribestate))

        if len(errors) == 0:
            results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
            errors.extend(check_unsubscribe_errors(resources_to_delete, results_unsubscribestate))

        if len(errors) > 0:
            raise OperationFailedException('ConfigureDevice', extra_details=errors)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        sync_device_to_context(db_device, self.context_client)
        return DeviceId(**db_device.dump_id())

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
        device_uuid = request.device_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        sync_device_from_context(device_uuid, self.context_client, self.database)
        db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
        if db_device is None: return Empty()

        self.driver_instance_cache.delete(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        delete_device_from_context(db_device, self.context_client)

        for db_endpoint_pk,_ in db_device.references(EndPointModel):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            EndPointModel(self.database, db_endpoint_pk).delete()

        for db_driver_pk,_ in db_device.references(DriverModel):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            DriverModel(self.database, db_driver_pk).delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_initial_config = ConfigModel(self.database, db_device.device_initial_config_fk)
        for db_config_rule_pk,_ in db_initial_config.references(ConfigRuleModel):
            ConfigRuleModel(self.database, db_config_rule_pk).delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_running_config = ConfigModel(self.database, db_device.device_running_config_fk)
        for db_config_rule_pk,_ in db_running_config.references(ConfigRuleModel):
            ConfigRuleModel(self.database, db_config_rule_pk).delete()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        db_device.delete()
        db_initial_config.delete()
        db_running_config.delete()
        return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetInitialConfig(self, request : DeviceId, context : grpc.ServicerContext) -> DeviceConfig:
        device_uuid = request.device_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        sync_device_from_context(device_uuid, self.context_client, self.database)
        db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)

        config_rules = {} if db_device is None else db_device.dump_initial_config()
        return DeviceConfig(config_rules=config_rules)