import grpc, json, logging from typing import Any, List, Tuple from google.protobuf.json_format import MessageToDict from common.orm.Database import Database from common.orm.Factory import get_database_backend from common.orm.HighLevel import get_object, update_or_create_object from common.orm.backend.BackendEnum import BackendEnum from common.orm.backend.Tools import key_to_str from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, OperationFailedException from context.client.ContextClient import ContextClient from device.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig, DeviceId, Empty #from device.proto.device_pb2 import MonitoringSettings from device.proto.device_pb2_grpc import DeviceServiceServicer #from .MonitoringLoops import MonitoringLoops from .database.ConfigModel import ( ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw, update_config) from .database.DatabaseTools import ( delete_device_from_context, get_device_driver_filter_fields, sync_device_from_context, sync_device_to_context, update_device_in_local_database) from .database.DeviceModel import DeviceModel, DriverModel from .database.EndPointModel import EndPointModel #from .database.KpiModel import KpiModel #from .database.KpiSampleType import grpc_to_enum__kpi_sample_type from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES from .driver_api.DriverInstanceCache import DriverInstanceCache from .driver_api.Tools import ( check_delete_errors, check_set_errors, #check_subscribe_errors, check_unsubscribe_errors ) LOGGER = logging.getLogger(__name__) SERVICE_NAME = 'Device' METHOD_NAMES = ['AddDevice', 'ConfigureDevice', 'DeleteDevice', 'GetInitialConfig', 'MonitorDeviceKpi'] METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) class DeviceServiceServicerImpl(DeviceServiceServicer): def __init__( self, context_client : ContextClient, driver_instance_cache : DriverInstanceCache, #monitoring_loops : MonitoringLoops ): LOGGER.debug('Creating Servicer...') self.context_client = context_client self.database = Database(get_database_backend(backend=BackendEnum.INMEMORY)) self.driver_instance_cache = driver_instance_cache #self.monitoring_loops = monitoring_loops LOGGER.debug('Servicer Created') @safe_and_metered_rpc_method(METRICS, LOGGER) def AddDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId: device_id = request.device_id device_uuid = device_id.device_uuid.uuid connection_config_rules = {} unexpected_config_rules = [] for config_rule in request.device_config.config_rules: if (config_rule.action == ConfigActionEnum.CONFIGACTION_SET) and \ (config_rule.resource_key.startswith('_connect/')): connection_config_rules[config_rule.resource_key.replace('_connect/', '')] = config_rule.resource_value else: unexpected_config_rules.append(config_rule) if len(unexpected_config_rules) > 0: unexpected_config_rules = MessageToDict( request.device_config, including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=True) unexpected_config_rules = unexpected_config_rules['config_rules'] unexpected_config_rules = list(filter( lambda cr: cr['resource_key'].replace('_connect/', '') not in connection_config_rules, unexpected_config_rules)) str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True) raise InvalidArgumentException( 'device.device_config.config_rules', str_unexpected_config_rules, extra_details='RPC method AddDevice only accepts connection Config Rules that should start '\ 'with "_connect/" tag. Others should be configured after adding the device.') if len(request.device_endpoints) > 0: unexpected_endpoints = MessageToDict( request.device_endpoints, including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=True) str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True) raise InvalidArgumentException( 'device.device_endpoints', str_unexpected_endpoints, extra_details='RPC method AddDevice does not accept endpoints. Endpoints are discovered through '\ 'interrogation of the physical device.') # Remove device configuration json_request = MessageToDict( request, including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=True) json_request['device_config'] = {} request = Device(**json_request) sync_device_from_context(device_uuid, self.context_client, self.database) db_device,_ = update_device_in_local_database(self.database, request) driver_filter_fields = get_device_driver_filter_fields(db_device) address = connection_config_rules.pop('address', None) port = connection_config_rules.pop('port', None) driver : _Driver = self.driver_instance_cache.get( device_uuid, filter_fields=driver_filter_fields, address=address, port=port, settings=connection_config_rules) driver.Connect() endpoints = driver.GetConfig([RESOURCE_ENDPOINTS]) for _, resource_value in endpoints: endpoint_uuid = resource_value.get('name') endpoint_type = resource_value.get('type') str_endpoint_key = key_to_str([device_uuid, endpoint_uuid]) update_or_create_object( self.database, EndPointModel, str_endpoint_key, { 'device_fk' : db_device, 'endpoint_uuid': endpoint_uuid, 'endpoint_type': endpoint_type, }) running_config_rules = driver.GetConfig([RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES]) running_config_rules = [ (ORM_ConfigActionEnum.SET, config_rule[0], json.dumps(config_rule[1], sort_keys=True)) for config_rule in running_config_rules ] #for running_config_rule in running_config_rules: # LOGGER.info('[AddDevice] running_config_rule: {:s}'.format(str(running_config_rule))) update_config(self.database, device_uuid, 'running', running_config_rules) initial_config_rules = driver.GetInitialConfig() update_config(self.database, device_uuid, 'initial', initial_config_rules) sync_device_to_context(db_device, self.context_client) return DeviceId(**db_device.dump_id()) @safe_and_metered_rpc_method(METRICS, LOGGER) def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId: device_id = request.device_id device_uuid = device_id.device_uuid.uuid sync_device_from_context(device_uuid, self.context_client, self.database) context_config_rules = get_config_rules(self.database, device_uuid, 'running') context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules} LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules))) db_device,_ = update_device_in_local_database(self.database, request) request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules) LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules))) resources_to_set : List[Tuple[str, Any]] = [] # key, value resources_to_delete : List[Tuple[str, Any]] = [] # key, value for config_rule in request_config_rules: action, key, value = config_rule if action == ORM_ConfigActionEnum.SET: if (key not in context_config_rules) or (context_config_rules[key] != value): resources_to_set.append((key, value)) elif action == ORM_ConfigActionEnum.DELETE: if key in context_config_rules: resources_to_delete.append((key, value)) LOGGER.info('[ConfigureDevice] resources_to_set = {:s}'.format(str(resources_to_set))) LOGGER.info('[ConfigureDevice] resources_to_delete = {:s}'.format(str(resources_to_delete))) # TODO: use of datastores (might be virtual ones) to enable rollbacks errors = [] driver : _Driver = self.driver_instance_cache.get(device_uuid) if driver is None: errors.append('Device({:s}) has not been added to this Device instance'.format(str(device_uuid))) if len(errors) == 0: results_setconfig = driver.SetConfig(resources_to_set) errors.extend(check_set_errors(resources_to_set, results_setconfig)) if len(errors) == 0: results_deleteconfig = driver.DeleteConfig(resources_to_delete) errors.extend(check_delete_errors(resources_to_delete, results_deleteconfig)) if len(errors) > 0: raise OperationFailedException('ConfigureDevice', extra_details=errors) sync_device_to_context(db_device, self.context_client) return DeviceId(**db_device.dump_id()) @safe_and_metered_rpc_method(METRICS, LOGGER) def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty: device_uuid = request.device_uuid.uuid sync_device_from_context(device_uuid, self.context_client, self.database) db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) if db_device is None: return Empty() self.driver_instance_cache.delete(device_uuid) delete_device_from_context(db_device, self.context_client) for db_endpoint_pk,_ in db_device.references(EndPointModel): EndPointModel(self.database, db_endpoint_pk).delete() for db_driver_pk,_ in db_device.references(DriverModel): DriverModel(self.database, db_driver_pk).delete() db_initial_config = ConfigModel(self.database, db_device.device_initial_config_fk) for db_config_rule_pk,_ in db_initial_config.references(ConfigRuleModel): ConfigRuleModel(self.database, db_config_rule_pk).delete() db_running_config = ConfigModel(self.database, db_device.device_running_config_fk) for db_config_rule_pk,_ in db_running_config.references(ConfigRuleModel): ConfigRuleModel(self.database, db_config_rule_pk).delete() db_device.delete() db_initial_config.delete() db_running_config.delete() return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def GetInitialConfig(self, request : DeviceId, context : grpc.ServicerContext) -> DeviceConfig: device_uuid = request.device_uuid.uuid sync_device_from_context(device_uuid, self.context_client, self.database) db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) config_rules = {} if db_device is None else db_device.dump_initial_config() return DeviceConfig(config_rules=config_rules) # # Code under implemention and testing # @safe_and_metered_rpc_method(METRICS, LOGGER) # def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty: # kpi_uuid = request.kpi_id.kpi_id.uuid # # device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid # db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) # # endpoint_id = request.kpi_descriptor.endpoint_id # endpoint_uuid = endpoint_id.endpoint_uuid.uuid # endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid # if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid # str_endpoint_key = key_to_str([device_uuid, endpoint_uuid]) # endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid # endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid # if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: # str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) # str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') # db_endpoint : EndPointModel = get_object( # self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False) # # #db_kpi_prev = get_object(self.database, KpiModel, kpi_uuid, raise_if_not_found=False) # result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, { # 'kpi_uuid' : request.kpi_id.kpi_id.uuid, # 'kpi_description' : request.kpi_descriptor.kpi_description, # 'kpi_sample_type' : grpc_to_enum__kpi_sample_type(request.kpi_descriptor.kpi_sample_type), # 'device_fk' : db_device, # 'endpoint_fk' : db_endpoint, # 'sampling_duration': request.sampling_duration_s, # 'sampling_interval': request.sampling_interval_s, # }) # db_kpi, updated = result # # driver : _Driver = self.driver_instance_cache.get(device_uuid) # if driver is None: # msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid)) # raise OperationFailedException('ConfigureDevice', extra_details=msg) # # sampling_resource = driver.GetResource(db_endpoint.endpoint_uuid) # # #resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval # #resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval # #LOGGER.info('[ConfigureDevice] resources_to_subscribe = {:s}'.format(str(resources_to_subscribe))) # #LOGGER.info('[ConfigureDevice] resources_to_unsubscribe = {:s}'.format(str(resources_to_unsubscribe))) # # TODO: Implement configuration of subscriptions # # #if len(errors) == 0: # # results_subscribestate = driver.SubscribeState(resources_to_subscribe) # # errors.extend(check_subscribe_errors(resources_to_delete, results_subscribestate)) # # #if len(errors) == 0: # # results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe) # # errors.extend(check_unsubscribe_errors(resources_to_delete, results_unsubscribestate)) # # results = driver.SubscribeState([ # (sampling_resource, db_kpi.sampling_duration, db_kpi.sampling_interval), # ]) # assert len(results) == 4 # for result in results: assert isinstance(result, bool) and result # # self.monitoring_loops.add(device_uuid, driver) # # return Empty()