Skip to content
Snippets Groups Projects
DeviceServiceServicerImpl.py 23.7 KiB
Newer Older
import grpc, json, logging, re
from typing import Any, Dict, List, Tuple
from common.orm.Database import Database
from common.orm.HighLevel import get_object, update_or_create_object
from common.orm.backend.Tools import key_to_str
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, OperationFailedException
from common.tools.grpc.Tools import grpc_message_to_json
from context.client.ContextClient import ContextClient
from context.proto.kpi_sample_types_pb2 import KpiSampleType
from device.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig, DeviceId, Empty
from device.proto.device_pb2 import MonitoringSettings
from device.proto.device_pb2_grpc import DeviceServiceServicer
from .database.ConfigModel import (
    ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw, update_config)
from .database.DatabaseTools import (
    delete_device_from_context, get_device_driver_filter_fields, sync_device_from_context, sync_device_to_context,
    update_device_in_local_database)
from .database.DeviceModel import DeviceModel, DriverModel
from .database.EndPointModel import EndPointModel, EndPointMonitorModel
from .database.KpiModel import KpiModel
from .database.KpiSampleType import ORM_KpiSampleTypeEnum, grpc_to_enum__kpi_sample_type
from .database.RelationModels import EndPointMonitorKpiModel
from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS #, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES
from .driver_api.DriverInstanceCache import DriverInstanceCache
from .driver_api.Tools import (
    check_delete_errors, check_set_errors, check_subscribe_errors, check_unsubscribe_errors)
from .MonitoringLoops import MonitoringLoops
SERVICE_NAME = 'Device'
METHOD_NAMES = ['AddDevice', 'ConfigureDevice', 'DeleteDevice', 'GetInitialConfig', 'MonitorDeviceKpi']
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
class DeviceServiceServicerImpl(DeviceServiceServicer):
        self, context_client : ContextClient, database : Database, driver_instance_cache : DriverInstanceCache,
        monitoring_loops : MonitoringLoops):
        self.context_client = context_client
        self.database = database
        self.driver_instance_cache = driver_instance_cache
        self.monitoring_loops = monitoring_loops
    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def AddDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
        device_id = request.device_id
        device_uuid = device_id.device_uuid.uuid

        connection_config_rules = {}
        unexpected_config_rules = []
        for config_rule in request.device_config.config_rules:
            if (config_rule.action == ConfigActionEnum.CONFIGACTION_SET) and \
               (config_rule.resource_key.startswith('_connect/')):
                connection_config_rules[config_rule.resource_key.replace('_connect/', '')] = config_rule.resource_value
            else:
                unexpected_config_rules.append(config_rule)
        if len(unexpected_config_rules) > 0:
            unexpected_config_rules = grpc_message_to_json(request.device_config)
            unexpected_config_rules = unexpected_config_rules['config_rules']
            unexpected_config_rules = list(filter(
                lambda cr: cr['resource_key'].replace('_connect/', '') not in connection_config_rules,
                unexpected_config_rules))
            str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True)
            raise InvalidArgumentException(
                'device.device_config.config_rules', str_unexpected_config_rules,
                extra_details='RPC method AddDevice only accepts connection Config Rules that should start '\
                              'with "_connect/" tag. Others should be configured after adding the device.')

        if len(request.device_endpoints) > 0:
            unexpected_endpoints = []
            for device_endpoint in request.device_endpoints:
                unexpected_endpoints.append(grpc_message_to_json(device_endpoint))
            str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True)
            raise InvalidArgumentException(
                'device.device_endpoints', str_unexpected_endpoints,
                extra_details='RPC method AddDevice does not accept Endpoints. Endpoints are discovered through '\
                              'interrogation of the physical device.')

        # Remove device configuration
        json_request = grpc_message_to_json(request, use_integers_for_enums=True)
        json_request['device_config'] = {}
        request = Device(**json_request)

        sync_device_from_context(device_uuid, self.context_client, self.database)
        db_device,_ = update_device_in_local_database(self.database, request)

        driver_filter_fields = get_device_driver_filter_fields(db_device)

        #LOGGER.info('[AddDevice] connection_config_rules = {:s}'.format(str(connection_config_rules)))
        address  = connection_config_rules.pop('address', None)
        port     = connection_config_rules.pop('port', None)
        settings = connection_config_rules.pop('settings', '{}')
        try:
            settings = json.loads(settings)
        except ValueError as e:
            raise InvalidArgumentException(
                'device.device_config.config_rules[settings]', settings,
                extra_details='_connect/settings Config Rules provided cannot be decoded as JSON dictionary.') from e
        driver : _Driver = self.driver_instance_cache.get(
            device_uuid, filter_fields=driver_filter_fields, address=address, port=port, settings=settings)
        driver.Connect()

        endpoints = driver.GetConfig([RESOURCE_ENDPOINTS])
        try:
            for resource_key, resource_value in endpoints:
                if isinstance(resource_value, Exception):
                    LOGGER.error('Error retrieving "{:s}": {:s}'.format(str(RESOURCE_ENDPOINTS), str(resource_value)))
                    continue
                endpoint_uuid = resource_value.get('uuid')
                endpoint_type = resource_value.get('type')
                str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
                db_endpoint, _ = update_or_create_object(
                    self.database, EndPointModel, str_endpoint_key, {
                    'device_fk'    : db_device,
                    'endpoint_uuid': endpoint_uuid,
                    'endpoint_type': endpoint_type,
                    'resource_key' : resource_key,
                sample_types : Dict[int, str] = resource_value.get('sample_types', {})
                for sample_type, monitor_resource_key in sample_types.items():
                    str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
                    update_or_create_object(self.database, EndPointMonitorModel, str_endpoint_monitor_key, {
                        'endpoint_fk'    : db_endpoint,
                        'resource_key'   : monitor_resource_key,
                        'kpi_sample_type': grpc_to_enum__kpi_sample_type(sample_type),
                    })
        except: # pylint: disable=bare-except
            LOGGER.exception('[AddDevice] endpoints = {:s}'.format(str(endpoints)))

        raw_running_config_rules = driver.GetConfig()
        running_config_rules = []
        for resource_key, resource_value in raw_running_config_rules:
            if isinstance(resource_value, Exception):
                msg = 'Error retrieving config rules: {:s} => {:s}'
                LOGGER.error(msg.format(str(resource_key), str(resource_value)))
                continue
            config_rule = (ORM_ConfigActionEnum.SET, resource_key, json.dumps(resource_value, sort_keys=True))
            running_config_rules.append(config_rule)
        #for running_config_rule in running_config_rules:
        #    LOGGER.info('[AddDevice] running_config_rule: {:s}'.format(str(running_config_rule)))
        update_config(self.database, device_uuid, 'running', running_config_rules)

        initial_config_rules = driver.GetInitialConfig()
        update_config(self.database, device_uuid, 'initial', initial_config_rules)

        #LOGGER.info('[AddDevice] db_device = {:s}'.format(str(db_device.dump(
        #    include_config_rules=True, include_drivers=True, include_endpoints=True))))

        sync_device_to_context(db_device, self.context_client)
        return DeviceId(**db_device.dump_id())

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
        device_id = request.device_id
        device_uuid = device_id.device_uuid.uuid

        sync_device_from_context(device_uuid, self.context_client, self.database)

        context_config_rules = get_config_rules(self.database, device_uuid, 'running')
        context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules}
        #LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules)))

        db_device,_ = update_device_in_local_database(self.database, request)

        request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
        #LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules)))
        resources_to_set    : List[Tuple[str, Any]] = [] # key, value
        resources_to_delete : List[Tuple[str, Any]] = [] # key, value

        for config_rule in request_config_rules:
            action, key, value = config_rule
            if action == ORM_ConfigActionEnum.SET:
                if (key not in context_config_rules) or (context_config_rules[key] != value):
                    resources_to_set.append((key, value))
            elif action == ORM_ConfigActionEnum.DELETE:
                if key in context_config_rules:
                    resources_to_delete.append((key, value))

        #LOGGER.info('[ConfigureDevice] resources_to_set = {:s}'.format(str(resources_to_set)))
        #LOGGER.info('[ConfigureDevice] resources_to_delete = {:s}'.format(str(resources_to_delete)))

        # TODO: use of datastores (might be virtual ones) to enable rollbacks

        errors = []

        driver : _Driver = self.driver_instance_cache.get(device_uuid)
        if driver is None:
            errors.append('Device({:s}) has not been added to this Device instance'.format(str(device_uuid)))

        if len(errors) == 0:
            results_setconfig = driver.SetConfig(resources_to_set)
            errors.extend(check_set_errors(resources_to_set, results_setconfig))

        if len(errors) == 0:
            results_deleteconfig = driver.DeleteConfig(resources_to_delete)
            errors.extend(check_delete_errors(resources_to_delete, results_deleteconfig))

        if len(errors) > 0:
            raise OperationFailedException('ConfigureDevice', extra_details=errors)

        running_config_rules = driver.GetConfig()
        running_config_rules = [
            (ORM_ConfigActionEnum.SET, config_rule[0], json.dumps(config_rule[1], sort_keys=True))
            for config_rule in running_config_rules
        ]
        #for running_config_rule in running_config_rules:
        #    LOGGER.info('[ConfigureDevice] running_config_rule: {:s}'.format(str(running_config_rule)))
        update_config(self.database, device_uuid, 'running', running_config_rules)

        sync_device_to_context(db_device, self.context_client)
        return DeviceId(**db_device.dump_id())

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
        device_uuid = request.device_uuid.uuid

        self.monitoring_loops.remove(device_uuid)

        sync_device_from_context(device_uuid, self.context_client, self.database)
        db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
        if db_device is None: return Empty()

        self.driver_instance_cache.delete(device_uuid)
        delete_device_from_context(db_device, self.context_client)

        for db_kpi_pk,_ in db_device.references(KpiModel):
            db_kpi = get_object(self.database, KpiModel, db_kpi_pk)
            for db_endpoint_monitor_kpi_pk,_ in db_kpi.references(EndPointMonitorKpiModel):
                get_object(self.database, EndPointMonitorKpiModel, db_endpoint_monitor_kpi_pk).delete()
            db_kpi.delete()
        for db_endpoint_pk,_ in db_device.references(EndPointModel):
            db_endpoint = EndPointModel(self.database, db_endpoint_pk)
            for db_endpoint_monitor_pk,_ in db_endpoint.references(EndPointMonitorModel):
                get_object(self.database, EndPointMonitorModel, db_endpoint_monitor_pk).delete()
            db_endpoint.delete()

        for db_driver_pk,_ in db_device.references(DriverModel):
            get_object(self.database, DriverModel, db_driver_pk).delete()

        db_initial_config = ConfigModel(self.database, db_device.device_initial_config_fk)
        for db_config_rule_pk,_ in db_initial_config.references(ConfigRuleModel):
            get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete()

        db_running_config = ConfigModel(self.database, db_device.device_running_config_fk)
        for db_config_rule_pk,_ in db_running_config.references(ConfigRuleModel):
            get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete()

        db_device.delete()
        db_initial_config.delete()
        db_running_config.delete()
        return Empty()

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def GetInitialConfig(self, request : DeviceId, context : grpc.ServicerContext) -> DeviceConfig:
        device_uuid = request.device_uuid.uuid

        sync_device_from_context(device_uuid, self.context_client, self.database)
        db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)

        config_rules = {} if db_device is None else db_device.dump_initial_config()
        return DeviceConfig(config_rules=config_rules)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty:
        kpi_uuid = request.kpi_id.kpi_id.uuid

        subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0)
        if subscribe:
            device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid

            db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
            if db_device is None:
                msg = 'Device({:s}) has not been added to this Device instance.'.format(str(device_uuid))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

            endpoint_id = request.kpi_descriptor.endpoint_id
            endpoint_uuid = endpoint_id.endpoint_uuid.uuid
            str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
            endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
            endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid
            if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
                str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
                str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
            db_endpoint : EndPointModel = get_object(
                self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False)
            if db_endpoint is None:
                msg = 'Device({:s})/EndPoint({:s}) not found. EndPointKey({:s})'.format(
                    str(device_uuid), str(endpoint_uuid), str(str_endpoint_key))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

            driver : _Driver = self.driver_instance_cache.get(device_uuid)
            if driver is None:
                msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

            sample_type = request.kpi_descriptor.kpi_sample_type

            attributes = {
                'kpi_uuid'         : request.kpi_id.kpi_id.uuid,
                'kpi_description'  : request.kpi_descriptor.kpi_description,
                'kpi_sample_type'  : grpc_to_enum__kpi_sample_type(sample_type),
                'device_fk'        : db_device,
                'endpoint_fk'      : db_endpoint,
                'sampling_duration': request.sampling_duration_s,
                'sampling_interval': request.sampling_interval_s,
            }
            result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, attributes)
            db_kpi, updated = result

            str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
            db_endpoint_monitor : EndPointMonitorModel = get_object(
                self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
            if db_endpoint_monitor is None:
                msg = 'SampleType({:s}/{:s}) not supported for Device({:s})/EndPoint({:s}).'.format(
                    str(sample_type), str(KpiSampleType.Name(sample_type).upper().replace('KPISAMPLETYPE_', '')),
                    str(device_uuid), str(endpoint_uuid))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

            endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key)
            str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
            attributes = {
                'endpoint_monitor_fk': db_endpoint_monitor,
                'kpi_fk'             : db_kpi,
            }
            result : Tuple[EndPointMonitorKpiModel, bool] = update_or_create_object(
                self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, attributes)
            db_endpoint_monitor_kpi, updated = result

            resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
            resources_to_subscribe.append(
                (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))
            results_subscribestate = driver.SubscribeState(resources_to_subscribe)
            errors = check_subscribe_errors(resources_to_subscribe, results_subscribestate)
            if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)

            self.monitoring_loops.add(device_uuid, driver)

        else:
            db_kpi : KpiModel = get_object(
                self.database, KpiModel, kpi_uuid, raise_if_not_found=False)
            if db_kpi is None:
                msg = 'Kpi({:s}) not found'.format(str(kpi_uuid))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

            db_device : DeviceModel = get_object(
                self.database, DeviceModel, db_kpi.device_fk, raise_if_not_found=False)
            if db_device is None:
                msg = 'Device({:s}) not found'.format(str(db_kpi.device_fk))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
            device_uuid = db_device.device_uuid

            db_endpoint : EndPointModel = get_object(
                self.database, EndPointModel, db_kpi.endpoint_fk, raise_if_not_found=False)
            if db_endpoint is None:
                msg = 'EndPoint({:s}) not found'.format(str(db_kpi.endpoint_fk))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
            endpoint_uuid = db_endpoint.endpoint_uuid
            str_endpoint_key = db_endpoint.pk

            kpi_sample_type : ORM_KpiSampleTypeEnum = db_kpi.kpi_sample_type
            sample_type = kpi_sample_type.value
            str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
            db_endpoint_monitor : EndPointMonitorModel = get_object(
                self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
            if db_endpoint_monitor is None:
                msg = 'EndPointMonitor({:s}) not found.'.format(str(str_endpoint_monitor_key))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

            endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key)
            str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
            db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object(
                self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False)
            if db_endpoint_monitor_kpi is None:
                msg = 'EndPointMonitorKpi({:s}) not found.'.format(str(str_endpoint_monitor_kpi_key))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

            resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
            resources_to_unsubscribe.append(
                (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))

            driver : _Driver = self.driver_instance_cache.get(device_uuid)
            if driver is None:
                msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

            results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
            errors = check_unsubscribe_errors(resources_to_unsubscribe, results_unsubscribestate)
            if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)

            db_endpoint_monitor_kpi.delete()
            db_kpi.delete()

            # There is one monitoring loop per device; keep them active since they are re-used by different monitoring
            # requests.
            #self.monitoring_loops.remove(device_uuid)

        running_config_rules = [
            (config_rule[0], json.dumps(config_rule[1], sort_keys=True))
            for config_rule in driver.GetConfig()
        ]
        context_config_rules = {
            config_rule[1]: config_rule[2]
            for config_rule in get_config_rules(self.database, device_uuid, 'running')
        }

        # each in context, not in running => delete in context
        # each in running, not in context => add to context
        # each in context and in running, context.value != running.value => update in context

        running_config_rules_actions : List[Tuple[ORM_ConfigActionEnum, str, str]] = []
        for config_rule_key,config_rule_value in running_config_rules:
            running_config_rules_actions.append((ORM_ConfigActionEnum.SET, config_rule_key, config_rule_value))
            context_config_rules.pop(config_rule_key, None)
        for context_rule_key,context_rule_value in context_config_rules.items():
            running_config_rules_actions.append((ORM_ConfigActionEnum.DELETE, context_rule_key, context_rule_value))

        #msg = '[MonitorDeviceKpi] running_config_rules_action[{:d}]: {:s}'
        #for i,running_config_rules_action in enumerate(running_config_rules_actions):
        #    LOGGER.info(msg.format(i, str(running_config_rules_action)))
        update_config(self.database, device_uuid, 'running', running_config_rules_actions)

        sync_device_to_context(db_device, self.context_client)