# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import grpc, json, logging, re from typing import Any, Dict, List, Tuple from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.method_wrappers.ServiceExceptions import InvalidArgumentException, OperationFailedException from common.orm.Database import Database from common.orm.HighLevel import get_object, update_or_create_object from common.orm.backend.Tools import key_to_str from common.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig, DeviceId, Empty from common.proto.device_pb2 import MonitoringSettings from common.proto.device_pb2_grpc import DeviceServiceServicer from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.tools.grpc.Tools import grpc_message_to_json from common.tools.mutex_queues.MutexQueues import MutexQueues from context.client.ContextClient import ContextClient from .database.ConfigModel import ( ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw, update_config) from .database.DatabaseTools import ( delete_device_from_context, get_device_driver_filter_fields, sync_device_from_context, sync_device_to_context, update_device_in_local_database) from .database.DeviceModel import DeviceModel, DriverModel from .database.EndPointModel import EndPointModel, EndPointMonitorModel from .database.KpiModel import KpiModel from .database.KpiSampleType import ORM_KpiSampleTypeEnum, grpc_to_enum__kpi_sample_type from .database.RelationModels import EndPointMonitorKpiModel from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS #, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES from .driver_api.DriverInstanceCache import DriverInstanceCache from .driver_api.Tools import ( check_delete_errors, check_set_errors, check_subscribe_errors, check_unsubscribe_errors) from .MonitoringLoops import MonitoringLoops LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Device', 'RPC') class DeviceServiceServicerImpl(DeviceServiceServicer): def __init__( self, database : Database, driver_instance_cache : DriverInstanceCache, monitoring_loops : MonitoringLoops ) -> None: LOGGER.debug('Creating Servicer...') self.context_client = ContextClient() self.database = database self.driver_instance_cache = driver_instance_cache self.monitoring_loops = monitoring_loops self.mutex_queues = MutexQueues() LOGGER.debug('Servicer Created') @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def AddDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId: device_id = request.device_id device_uuid = device_id.device_uuid.uuid connection_config_rules = {} unexpected_config_rules = [] for config_rule in request.device_config.config_rules: if (config_rule.action == ConfigActionEnum.CONFIGACTION_SET) and \ (config_rule.WhichOneof('config_rule') == 'custom') and \ (config_rule.custom.resource_key.startswith('_connect/')): connection_config_rules[ config_rule.custom.resource_key.replace('_connect/', '') ] = config_rule.custom.resource_value else: unexpected_config_rules.append(config_rule) if len(unexpected_config_rules) > 0: unexpected_config_rules = grpc_message_to_json(request.device_config) unexpected_config_rules = unexpected_config_rules['config_rules'] unexpected_config_rules = list(filter( lambda cr: cr.get('custom', {})['resource_key'].replace('_connect/', '') not in connection_config_rules, unexpected_config_rules)) str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True) raise InvalidArgumentException( 'device.device_config.config_rules', str_unexpected_config_rules, extra_details='RPC method AddDevice only accepts connection Config Rules that should start '\ 'with "_connect/" tag. Others should be configured after adding the device.') if len(request.device_endpoints) > 0: unexpected_endpoints = [] for device_endpoint in request.device_endpoints: unexpected_endpoints.append(grpc_message_to_json(device_endpoint)) str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True) raise InvalidArgumentException( 'device.device_endpoints', str_unexpected_endpoints, extra_details='RPC method AddDevice does not accept Endpoints. Endpoints are discovered through '\ 'interrogation of the physical device.') # Remove device configuration json_request = grpc_message_to_json(request, use_integers_for_enums=True) json_request['device_config'] = {} request = Device(**json_request) self.mutex_queues.wait_my_turn(device_uuid) try: sync_device_from_context(device_uuid, self.context_client, self.database) db_device,_ = update_device_in_local_database(self.database, request) driver_filter_fields = get_device_driver_filter_fields(db_device) #LOGGER.info('[AddDevice] connection_config_rules = {:s}'.format(str(connection_config_rules))) address = connection_config_rules.pop('address', None) port = connection_config_rules.pop('port', None) settings = connection_config_rules.pop('settings', '{}') try: settings = json.loads(settings) except ValueError as e: raise InvalidArgumentException( 'device.device_config.config_rules[settings]', settings, extra_details='_connect/settings Config Rules provided cannot be decoded as JSON dictionary.') from e driver : _Driver = self.driver_instance_cache.get( device_uuid, filter_fields=driver_filter_fields, address=address, port=port, settings=settings) driver.Connect() endpoints = driver.GetConfig([RESOURCE_ENDPOINTS]) try: for resource_key, resource_value in endpoints: if isinstance(resource_value, Exception): LOGGER.error('Error retrieving "{:s}": {:s}'.format(str(RESOURCE_ENDPOINTS), str(resource_value))) continue endpoint_uuid = resource_value.get('uuid') endpoint_type = resource_value.get('type') str_endpoint_key = key_to_str([device_uuid, endpoint_uuid]) db_endpoint, _ = update_or_create_object( self.database, EndPointModel, str_endpoint_key, { 'device_fk' : db_device, 'endpoint_uuid': endpoint_uuid, 'endpoint_type': endpoint_type, 'resource_key' : resource_key, }) sample_types : Dict[int, str] = resource_value.get('sample_types', {}) for sample_type, monitor_resource_key in sample_types.items(): str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)]) update_or_create_object(self.database, EndPointMonitorModel, str_endpoint_monitor_key, { 'endpoint_fk' : db_endpoint, 'resource_key' : monitor_resource_key, 'kpi_sample_type': grpc_to_enum__kpi_sample_type(sample_type), }) except: # pylint: disable=bare-except LOGGER.exception('[AddDevice] endpoints = {:s}'.format(str(endpoints))) raw_running_config_rules = driver.GetConfig() running_config_rules = [] for resource_key, resource_value in raw_running_config_rules: if isinstance(resource_value, Exception): msg = 'Error retrieving config rules: {:s} => {:s}' LOGGER.error(msg.format(str(resource_key), str(resource_value))) continue config_rule = (ORM_ConfigActionEnum.SET, resource_key, json.dumps(resource_value, sort_keys=True)) running_config_rules.append(config_rule) #for running_config_rule in running_config_rules: # LOGGER.info('[AddDevice] running_config_rule: {:s}'.format(str(running_config_rule))) update_config(self.database, device_uuid, 'running', running_config_rules) initial_config_rules = driver.GetInitialConfig() update_config(self.database, device_uuid, 'initial', initial_config_rules) #LOGGER.info('[AddDevice] db_device = {:s}'.format(str(db_device.dump( # include_config_rules=True, include_drivers=True, include_endpoints=True)))) sync_device_to_context(db_device, self.context_client) return DeviceId(**db_device.dump_id()) finally: self.mutex_queues.signal_done(device_uuid) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId: device_id = request.device_id device_uuid = device_id.device_uuid.uuid self.mutex_queues.wait_my_turn(device_uuid) try: sync_device_from_context(device_uuid, self.context_client, self.database) context_config_rules = get_config_rules(self.database, device_uuid, 'running') context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules} #LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules))) db_device,_ = update_device_in_local_database(self.database, request) request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules) #LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules))) resources_to_set : List[Tuple[str, Any]] = [] # key, value resources_to_delete : List[Tuple[str, Any]] = [] # key, value for config_rule in request_config_rules: action, key, value = config_rule if action == ORM_ConfigActionEnum.SET: if (key not in context_config_rules) or (context_config_rules[key] != value): resources_to_set.append((key, value)) elif action == ORM_ConfigActionEnum.DELETE: if key in context_config_rules: resources_to_delete.append((key, value)) #LOGGER.info('[ConfigureDevice] resources_to_set = {:s}'.format(str(resources_to_set))) #LOGGER.info('[ConfigureDevice] resources_to_delete = {:s}'.format(str(resources_to_delete))) # TODO: use of datastores (might be virtual ones) to enable rollbacks errors = [] driver : _Driver = self.driver_instance_cache.get(device_uuid) if driver is None: errors.append('Device({:s}) has not been added to this Device instance'.format(str(device_uuid))) if len(errors) == 0: results_setconfig = driver.SetConfig(resources_to_set) errors.extend(check_set_errors(resources_to_set, results_setconfig)) if len(errors) == 0: results_deleteconfig = driver.DeleteConfig(resources_to_delete) errors.extend(check_delete_errors(resources_to_delete, results_deleteconfig)) if len(errors) > 0: raise OperationFailedException('ConfigureDevice', extra_details=errors) running_config_rules = driver.GetConfig() running_config_rules = [ (ORM_ConfigActionEnum.SET, config_rule[0], json.dumps(config_rule[1], sort_keys=True)) for config_rule in running_config_rules if not isinstance(config_rule[1], Exception) ] #for running_config_rule in running_config_rules: # LOGGER.info('[ConfigureDevice] running_config_rule: {:s}'.format(str(running_config_rule))) update_config(self.database, device_uuid, 'running', running_config_rules) sync_device_to_context(db_device, self.context_client) return DeviceId(**db_device.dump_id()) finally: self.mutex_queues.signal_done(device_uuid) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty: device_uuid = request.device_uuid.uuid self.mutex_queues.wait_my_turn(device_uuid) try: self.monitoring_loops.remove(device_uuid) sync_device_from_context(device_uuid, self.context_client, self.database) db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) if db_device is None: return Empty() self.driver_instance_cache.delete(device_uuid) delete_device_from_context(db_device, self.context_client) for db_kpi_pk,_ in db_device.references(KpiModel): db_kpi = get_object(self.database, KpiModel, db_kpi_pk) for db_endpoint_monitor_kpi_pk,_ in db_kpi.references(EndPointMonitorKpiModel): get_object(self.database, EndPointMonitorKpiModel, db_endpoint_monitor_kpi_pk).delete() db_kpi.delete() for db_endpoint_pk,_ in db_device.references(EndPointModel): db_endpoint = EndPointModel(self.database, db_endpoint_pk) for db_endpoint_monitor_pk,_ in db_endpoint.references(EndPointMonitorModel): get_object(self.database, EndPointMonitorModel, db_endpoint_monitor_pk).delete() db_endpoint.delete() for db_driver_pk,_ in db_device.references(DriverModel): get_object(self.database, DriverModel, db_driver_pk).delete() db_initial_config = ConfigModel(self.database, db_device.device_initial_config_fk) for db_config_rule_pk,_ in db_initial_config.references(ConfigRuleModel): get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete() db_running_config = ConfigModel(self.database, db_device.device_running_config_fk) for db_config_rule_pk,_ in db_running_config.references(ConfigRuleModel): get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete() db_device.delete() db_initial_config.delete() db_running_config.delete() return Empty() finally: self.mutex_queues.signal_done(device_uuid) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetInitialConfig(self, request : DeviceId, context : grpc.ServicerContext) -> DeviceConfig: device_uuid = request.device_uuid.uuid self.mutex_queues.wait_my_turn(device_uuid) try: sync_device_from_context(device_uuid, self.context_client, self.database) db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) config_rules = {} if db_device is None else db_device.dump_initial_config() device_config = DeviceConfig(config_rules=config_rules) return device_config finally: self.mutex_queues.signal_done(device_uuid) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty: kpi_uuid = request.kpi_id.kpi_id.uuid device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid self.mutex_queues.wait_my_turn(device_uuid) try: subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0) if subscribe: db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) if db_device is None: msg = 'Device({:s}) has not been added to this Device instance.'.format(str(device_uuid)) raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) endpoint_id = request.kpi_descriptor.endpoint_id endpoint_uuid = endpoint_id.endpoint_uuid.uuid str_endpoint_key = key_to_str([device_uuid, endpoint_uuid]) endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') db_endpoint : EndPointModel = get_object( self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False) if db_endpoint is None: msg = 'Device({:s})/EndPoint({:s}) not found. EndPointKey({:s})'.format( str(device_uuid), str(endpoint_uuid), str(str_endpoint_key)) raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) driver : _Driver = self.driver_instance_cache.get(device_uuid) if driver is None: msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid)) raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) sample_type = request.kpi_descriptor.kpi_sample_type attributes = { 'kpi_uuid' : request.kpi_id.kpi_id.uuid, 'kpi_description' : request.kpi_descriptor.kpi_description, 'kpi_sample_type' : grpc_to_enum__kpi_sample_type(sample_type), 'device_fk' : db_device, 'endpoint_fk' : db_endpoint, 'sampling_duration': request.sampling_duration_s, 'sampling_interval': request.sampling_interval_s, } result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, attributes) db_kpi, updated = result str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)]) db_endpoint_monitor : EndPointMonitorModel = get_object( self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False) if db_endpoint_monitor is None: msg = 'SampleType({:s}/{:s}) not supported for Device({:s})/EndPoint({:s}).'.format( str(sample_type), str(KpiSampleType.Name(sample_type).upper().replace('KPISAMPLETYPE_', '')), str(device_uuid), str(endpoint_uuid)) raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key) str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':') attributes = { 'endpoint_monitor_fk': db_endpoint_monitor, 'kpi_fk' : db_kpi, } result : Tuple[EndPointMonitorKpiModel, bool] = update_or_create_object( self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, attributes) db_endpoint_monitor_kpi, updated = result resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval resources_to_subscribe.append( (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval)) results_subscribestate = driver.SubscribeState(resources_to_subscribe) errors = check_subscribe_errors(resources_to_subscribe, results_subscribestate) if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors) self.monitoring_loops.add(device_uuid, driver) else: db_kpi : KpiModel = get_object( self.database, KpiModel, kpi_uuid, raise_if_not_found=False) if db_kpi is None: msg = 'Kpi({:s}) not found'.format(str(kpi_uuid)) raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) db_device : DeviceModel = get_object( self.database, DeviceModel, db_kpi.device_fk, raise_if_not_found=False) if db_device is None: msg = 'Device({:s}) not found'.format(str(db_kpi.device_fk)) raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) device_uuid = db_device.device_uuid db_endpoint : EndPointModel = get_object( self.database, EndPointModel, db_kpi.endpoint_fk, raise_if_not_found=False) if db_endpoint is None: msg = 'EndPoint({:s}) not found'.format(str(db_kpi.endpoint_fk)) raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) endpoint_uuid = db_endpoint.endpoint_uuid str_endpoint_key = db_endpoint.pk kpi_sample_type : ORM_KpiSampleTypeEnum = db_kpi.kpi_sample_type sample_type = kpi_sample_type.value str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)]) db_endpoint_monitor : EndPointMonitorModel = get_object( self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False) if db_endpoint_monitor is None: msg = 'EndPointMonitor({:s}) not found.'.format(str(str_endpoint_monitor_key)) raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key) str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':') db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object( self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False) if db_endpoint_monitor_kpi is None: msg = 'EndPointMonitorKpi({:s}) not found.'.format(str(str_endpoint_monitor_kpi_key)) raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval resources_to_unsubscribe.append( (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval)) driver : _Driver = self.driver_instance_cache.get(device_uuid) if driver is None: msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid)) raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe) errors = check_unsubscribe_errors(resources_to_unsubscribe, results_unsubscribestate) if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors) db_endpoint_monitor_kpi.delete() db_kpi.delete() # There is one monitoring loop per device; keep them active since they are re-used by different monitoring # requests. #self.monitoring_loops.remove(device_uuid) # Subscriptions are not stored as classical driver config. # TODO: consider adding it somehow in the configuration. # Warning: GetConfig might be very slow in OpenConfig devices #running_config_rules = [ # (config_rule[0], json.dumps(config_rule[1], sort_keys=True)) # for config_rule in driver.GetConfig() #] #context_config_rules = { # config_rule[1]: config_rule[2] # for config_rule in get_config_rules(self.database, device_uuid, 'running') #} ## each in context, not in running => delete in context ## each in running, not in context => add to context ## each in context and in running, context.value != running.value => update in context #running_config_rules_actions : List[Tuple[ORM_ConfigActionEnum, str, str]] = [] #for config_rule_key,config_rule_value in running_config_rules: # running_config_rules_actions.append((ORM_ConfigActionEnum.SET, config_rule_key, config_rule_value)) # context_config_rules.pop(config_rule_key, None) #for context_rule_key,context_rule_value in context_config_rules.items(): # running_config_rules_actions.append((ORM_ConfigActionEnum.DELETE, context_rule_key, context_rule_value)) ##msg = '[MonitorDeviceKpi] running_config_rules_action[{:d}]: {:s}' ##for i,running_config_rules_action in enumerate(running_config_rules_actions): ## LOGGER.info(msg.format(i, str(running_config_rules_action))) #update_config(self.database, device_uuid, 'running', running_config_rules_actions) sync_device_to_context(db_device, self.context_client) return Empty() finally: self.mutex_queues.signal_done(device_uuid)