diff --git a/src/device/service/DeviceService.py b/src/device/service/DeviceService.py index 59134f26d3dd8c3fa0a9dddbcd1d3df298ec076a..ca165a200ec09961b10f5892107020682d8c7658 100644 --- a/src/device/service/DeviceService.py +++ b/src/device/service/DeviceService.py @@ -14,14 +14,11 @@ from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc -from common.orm.backend.BackendEnum import BackendEnum -from common.orm.Database import Database -from common.orm.Factory import get_database_backend from common.proto.device_pb2_grpc import add_DeviceServiceServicer_to_server from common.tools.service.GenericGrpcService import GenericGrpcService from .driver_api.DriverInstanceCache import DriverInstanceCache from .DeviceServiceServicerImpl import DeviceServiceServicerImpl -from .MonitoringLoops import MonitoringLoops +from .monitoring.MonitoringLoops import MonitoringLoops # Custom gRPC settings # Multiple clients might keep connections alive waiting for RPC methods to be executed. @@ -32,9 +29,8 @@ class DeviceService(GenericGrpcService): def __init__(self, driver_instance_cache : DriverInstanceCache, cls_name: str = __name__) -> None: port = get_service_port_grpc(ServiceNameEnum.DEVICE) super().__init__(port, max_workers=GRPC_MAX_WORKERS, cls_name=cls_name) - database = Database(get_database_backend(backend=BackendEnum.INMEMORY)) - self.monitoring_loops = MonitoringLoops(database) - self.device_servicer = DeviceServiceServicerImpl(database, driver_instance_cache, self.monitoring_loops) + self.monitoring_loops = MonitoringLoops() + self.device_servicer = DeviceServiceServicerImpl(driver_instance_cache, self.monitoring_loops) def install_servicers(self): self.monitoring_loops.start() diff --git a/src/device/service/DeviceServiceServicerImpl.py b/src/device/service/DeviceServiceServicerImpl.py index 88f49de6fb5c07e39b7efc9d26ccba135f95c929..9d0f9bd3ec018c707b9c26c9d61663043589d9f5 100644 --- a/src/device/service/DeviceServiceServicerImpl.py +++ b/src/device/service/DeviceServiceServicerImpl.py @@ -12,47 +12,31 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, json, logging, re -from typing import Any, Dict, List, Tuple +import grpc, logging from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method -from common.method_wrappers.ServiceExceptions import InvalidArgumentException, OperationFailedException -from common.orm.Database import Database -from common.orm.HighLevel import get_object, update_or_create_object -from common.orm.backend.Tools import key_to_str -from common.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig, DeviceId, Empty +from common.method_wrappers.ServiceExceptions import NotFoundException, OperationFailedException +from common.proto.context_pb2 import Device, DeviceConfig, DeviceId, Empty from common.proto.device_pb2 import MonitoringSettings from common.proto.device_pb2_grpc import DeviceServiceServicer -from common.proto.kpi_sample_types_pb2 import KpiSampleType -from common.tools.grpc.Tools import grpc_message_to_json +from common.tools.context_queries.Device import get_device from common.tools.mutex_queues.MutexQueues import MutexQueues from context.client.ContextClient import ContextClient -from .database.ConfigModel import ( - ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw, update_config) -from .database.DatabaseTools import ( - delete_device_from_context, get_device_driver_filter_fields, sync_device_from_context, sync_device_to_context, - update_device_in_local_database) -from .database.DeviceModel import DeviceModel, DriverModel -from .database.EndPointModel import EndPointModel, EndPointMonitorModel -from .database.KpiModel import KpiModel -from .database.KpiSampleType import ORM_KpiSampleTypeEnum, grpc_to_enum__kpi_sample_type -from .database.RelationModels import EndPointMonitorKpiModel -from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS #, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES -from .driver_api.DriverInstanceCache import DriverInstanceCache -from .driver_api.Tools import ( - check_delete_errors, check_set_errors, check_subscribe_errors, check_unsubscribe_errors) -from .MonitoringLoops import MonitoringLoops +from .driver_api._Driver import _Driver +from .driver_api.DriverInstanceCache import DriverInstanceCache, get_driver +from .monitoring.MonitoringLoops import MonitoringLoops +from .Tools import ( + check_connect_rules, check_no_endpoints, compute_rules_to_add_delete, configure_rules, deconfigure_rules, + populate_config_rules, populate_endpoints, populate_initial_config_rules, subscribe_kpi, unsubscribe_kpi) LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Device', 'RPC') +ERROR_MISSING_DRIVER = 'Device({:s}) has not been added to this Device instance' + class DeviceServiceServicerImpl(DeviceServiceServicer): - def __init__( - self, database : Database, driver_instance_cache : DriverInstanceCache, monitoring_loops : MonitoringLoops - ) -> None: + def __init__(self, driver_instance_cache : DriverInstanceCache, monitoring_loops : MonitoringLoops) -> None: LOGGER.debug('Creating Servicer...') - self.context_client = ContextClient() - self.database = database self.driver_instance_cache = driver_instance_cache self.monitoring_loops = monitoring_loops self.mutex_queues = MutexQueues() @@ -63,114 +47,36 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): device_id = request.device_id device_uuid = device_id.device_uuid.uuid - connection_config_rules = {} - unexpected_config_rules = [] - for config_rule in request.device_config.config_rules: - if (config_rule.action == ConfigActionEnum.CONFIGACTION_SET) and \ - (config_rule.WhichOneof('config_rule') == 'custom') and \ - (config_rule.custom.resource_key.startswith('_connect/')): - connection_config_rules[ - config_rule.custom.resource_key.replace('_connect/', '') - ] = config_rule.custom.resource_value - else: - unexpected_config_rules.append(config_rule) - if len(unexpected_config_rules) > 0: - unexpected_config_rules = grpc_message_to_json(request.device_config) - unexpected_config_rules = unexpected_config_rules['config_rules'] - unexpected_config_rules = list(filter( - lambda cr: cr.get('custom', {})['resource_key'].replace('_connect/', '') not in connection_config_rules, - unexpected_config_rules)) - str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True) - raise InvalidArgumentException( - 'device.device_config.config_rules', str_unexpected_config_rules, - extra_details='RPC method AddDevice only accepts connection Config Rules that should start '\ - 'with "_connect/" tag. Others should be configured after adding the device.') - - if len(request.device_endpoints) > 0: - unexpected_endpoints = [] - for device_endpoint in request.device_endpoints: - unexpected_endpoints.append(grpc_message_to_json(device_endpoint)) - str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True) - raise InvalidArgumentException( - 'device.device_endpoints', str_unexpected_endpoints, - extra_details='RPC method AddDevice does not accept Endpoints. Endpoints are discovered through '\ - 'interrogation of the physical device.') - - # Remove device configuration - json_request = grpc_message_to_json(request, use_integers_for_enums=True) - json_request['device_config'] = {} - request = Device(**json_request) + connection_config_rules = check_connect_rules(request.device_config) + check_no_endpoints(request.device_endpoints) + + context_client = ContextClient() + device = get_device(context_client, device_uuid, rw_copy=True) + if device is None: + # not in context, create from request + device = Device() + device.CopyFrom(request) self.mutex_queues.wait_my_turn(device_uuid) try: - sync_device_from_context(device_uuid, self.context_client, self.database) - db_device,_ = update_device_in_local_database(self.database, request) - - driver_filter_fields = get_device_driver_filter_fields(db_device) - - #LOGGER.info('[AddDevice] connection_config_rules = {:s}'.format(str(connection_config_rules))) - address = connection_config_rules.pop('address', None) - port = connection_config_rules.pop('port', None) - settings = connection_config_rules.pop('settings', '{}') - try: - settings = json.loads(settings) - except ValueError as e: - raise InvalidArgumentException( - 'device.device_config.config_rules[settings]', settings, - extra_details='_connect/settings Config Rules provided cannot be decoded as JSON dictionary.') from e - driver : _Driver = self.driver_instance_cache.get( - device_uuid, filter_fields=driver_filter_fields, address=address, port=port, settings=settings) - driver.Connect() - - endpoints = driver.GetConfig([RESOURCE_ENDPOINTS]) - try: - for resource_key, resource_value in endpoints: - if isinstance(resource_value, Exception): - LOGGER.error('Error retrieving "{:s}": {:s}'.format(str(RESOURCE_ENDPOINTS), str(resource_value))) - continue - endpoint_uuid = resource_value.get('uuid') - endpoint_type = resource_value.get('type') - str_endpoint_key = key_to_str([device_uuid, endpoint_uuid]) - db_endpoint, _ = update_or_create_object( - self.database, EndPointModel, str_endpoint_key, { - 'device_fk' : db_device, - 'endpoint_uuid': endpoint_uuid, - 'endpoint_type': endpoint_type, - 'resource_key' : resource_key, - }) - sample_types : Dict[int, str] = resource_value.get('sample_types', {}) - for sample_type, monitor_resource_key in sample_types.items(): - str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)]) - update_or_create_object(self.database, EndPointMonitorModel, str_endpoint_monitor_key, { - 'endpoint_fk' : db_endpoint, - 'resource_key' : monitor_resource_key, - 'kpi_sample_type': grpc_to_enum__kpi_sample_type(sample_type), - }) - except: # pylint: disable=bare-except - LOGGER.exception('[AddDevice] endpoints = {:s}'.format(str(endpoints))) - - raw_running_config_rules = driver.GetConfig() - running_config_rules = [] - for resource_key, resource_value in raw_running_config_rules: - if isinstance(resource_value, Exception): - msg = 'Error retrieving config rules: {:s} => {:s}' - LOGGER.error(msg.format(str(resource_key), str(resource_value))) - continue - config_rule = (ORM_ConfigActionEnum.SET, resource_key, json.dumps(resource_value, sort_keys=True)) - running_config_rules.append(config_rule) - - #for running_config_rule in running_config_rules: - # LOGGER.info('[AddDevice] running_config_rule: {:s}'.format(str(running_config_rule))) - update_config(self.database, device_uuid, 'running', running_config_rules) - - initial_config_rules = driver.GetInitialConfig() - update_config(self.database, device_uuid, 'initial', initial_config_rules) - - #LOGGER.info('[AddDevice] db_device = {:s}'.format(str(db_device.dump( - # include_config_rules=True, include_drivers=True, include_endpoints=True)))) - - sync_device_to_context(db_device, self.context_client) - return DeviceId(**db_device.dump_id()) + driver : _Driver = get_driver(self.driver_instance_cache, device) + + errors = [] + + if len(device.device_endpoints) == 0: + # created from request, populate endpoints using driver + errors.extend(populate_endpoints(device, driver, self.monitoring_loops)) + + if len(device.device_config.config_rules) == len(connection_config_rules): + # created from request, populate config rules using driver + errors.extend(populate_config_rules(device, driver)) + + if len(errors) > 0: + for error in errors: LOGGER.error(error) + raise OperationFailedException('AddDevice', extra_details=errors) + + device_id = context_client.SetDevice(device) + return device_id finally: self.mutex_queues.signal_done(device_uuid) @@ -181,107 +87,52 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): self.mutex_queues.wait_my_turn(device_uuid) try: - sync_device_from_context(device_uuid, self.context_client, self.database) - - context_config_rules = get_config_rules(self.database, device_uuid, 'running') - context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules} - #LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules))) - - db_device,_ = update_device_in_local_database(self.database, request) - - request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules) - #LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules))) - - resources_to_set : List[Tuple[str, Any]] = [] # key, value - resources_to_delete : List[Tuple[str, Any]] = [] # key, value - - for config_rule in request_config_rules: - action, key, value = config_rule - if action == ORM_ConfigActionEnum.SET: - if (key not in context_config_rules) or (context_config_rules[key] != value): - resources_to_set.append((key, value)) - elif action == ORM_ConfigActionEnum.DELETE: - if key in context_config_rules: - resources_to_delete.append((key, value)) - - #LOGGER.info('[ConfigureDevice] resources_to_set = {:s}'.format(str(resources_to_set))) - #LOGGER.info('[ConfigureDevice] resources_to_delete = {:s}'.format(str(resources_to_delete))) - - # TODO: use of datastores (might be virtual ones) to enable rollbacks - - errors = [] + context_client = ContextClient() + device = get_device(context_client, device_uuid, rw_copy=True) + if device is None: + raise NotFoundException('Device', device_uuid, extra_details='loading in ConfigureDevice') driver : _Driver = self.driver_instance_cache.get(device_uuid) if driver is None: - errors.append('Device({:s}) has not been added to this Device instance'.format(str(device_uuid))) + msg = ERROR_MISSING_DRIVER.format(str(device_uuid)) + raise OperationFailedException('ConfigureDevice', extra_details=msg) - if len(errors) == 0: - results_setconfig = driver.SetConfig(resources_to_set) - errors.extend(check_set_errors(resources_to_set, results_setconfig)) + # TODO: use of datastores (might be virtual ones) to enable rollbacks + resources_to_set, resources_to_delete = compute_rules_to_add_delete(device, request) - if len(errors) == 0: - results_deleteconfig = driver.DeleteConfig(resources_to_delete) - errors.extend(check_delete_errors(resources_to_delete, results_deleteconfig)) + errors = [] + errors.extend(configure_rules(device, driver, resources_to_set)) + errors.extend(deconfigure_rules(device, driver, resources_to_delete)) if len(errors) > 0: + for error in errors: LOGGER.error(error) raise OperationFailedException('ConfigureDevice', extra_details=errors) - running_config_rules = driver.GetConfig() - running_config_rules = [ - (ORM_ConfigActionEnum.SET, config_rule[0], json.dumps(config_rule[1], sort_keys=True)) - for config_rule in running_config_rules if not isinstance(config_rule[1], Exception) - ] - #for running_config_rule in running_config_rules: - # LOGGER.info('[ConfigureDevice] running_config_rule: {:s}'.format(str(running_config_rule))) - update_config(self.database, device_uuid, 'running', running_config_rules) - - sync_device_to_context(db_device, self.context_client) - return DeviceId(**db_device.dump_id()) + # Rules updated by configure_rules() and deconfigure_rules() methods. + # Code to be removed soon if not needed. + #running_config_rules = driver.GetConfig() + #for config_rule in running_config_rules: + # if isinstance(config_rule[1], Exception): continue + # config_rule = device.device_config.config_rules.add() + # config_rule.action = ConfigActionEnum.CONFIGACTION_SET + # config_rule.custom.resource_key = config_rule[0] + # config_rule.custom.resource_value = json.dumps(config_rule[1], sort_keys=True) + + device_id = context_client.SetDevice(device) + return device_id finally: self.mutex_queues.signal_done(device_uuid) - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty: device_uuid = request.device_uuid.uuid self.mutex_queues.wait_my_turn(device_uuid) try: - self.monitoring_loops.remove(device_uuid) - - sync_device_from_context(device_uuid, self.context_client, self.database) - db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) - if db_device is None: return Empty() - + context_client = ContextClient() + self.monitoring_loops.remove_device(device_uuid) self.driver_instance_cache.delete(device_uuid) - delete_device_from_context(db_device, self.context_client) - - for db_kpi_pk,_ in db_device.references(KpiModel): - db_kpi = get_object(self.database, KpiModel, db_kpi_pk) - for db_endpoint_monitor_kpi_pk,_ in db_kpi.references(EndPointMonitorKpiModel): - get_object(self.database, EndPointMonitorKpiModel, db_endpoint_monitor_kpi_pk).delete() - db_kpi.delete() - - for db_endpoint_pk,_ in db_device.references(EndPointModel): - db_endpoint = EndPointModel(self.database, db_endpoint_pk) - for db_endpoint_monitor_pk,_ in db_endpoint.references(EndPointMonitorModel): - get_object(self.database, EndPointMonitorModel, db_endpoint_monitor_pk).delete() - db_endpoint.delete() - - for db_driver_pk,_ in db_device.references(DriverModel): - get_object(self.database, DriverModel, db_driver_pk).delete() - - db_initial_config = ConfigModel(self.database, db_device.device_initial_config_fk) - for db_config_rule_pk,_ in db_initial_config.references(ConfigRuleModel): - get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete() - - db_running_config = ConfigModel(self.database, db_device.device_running_config_fk) - for db_config_rule_pk,_ in db_running_config.references(ConfigRuleModel): - get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete() - - db_device.delete() - db_initial_config.delete() - db_running_config.delete() + context_client.RemoveDevice(request) return Empty() finally: self.mutex_queues.signal_done(device_uuid) @@ -292,177 +143,38 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): self.mutex_queues.wait_my_turn(device_uuid) try: - sync_device_from_context(device_uuid, self.context_client, self.database) - db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) + driver : _Driver = self.driver_instance_cache.get(device_uuid) + if driver is None: + msg = ERROR_MISSING_DRIVER.format(str(device_uuid)) + raise OperationFailedException('GetInitialConfig', extra_details=msg) + + device_config = DeviceConfig() + errors = populate_initial_config_rules(device_uuid, device_config, driver) + + if len(errors) > 0: + for error in errors: LOGGER.error(error) + raise OperationFailedException('GetInitialConfig', extra_details=errors) - config_rules = {} if db_device is None else db_device.dump_initial_config() - device_config = DeviceConfig(config_rules=config_rules) return device_config finally: self.mutex_queues.signal_done(device_uuid) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty: - kpi_uuid = request.kpi_id.kpi_id.uuid device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid + subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0) + manage_kpi_method = subscribe_kpi if subscribe else unsubscribe_kpi + self.mutex_queues.wait_my_turn(device_uuid) try: - subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0) - if subscribe: - db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) - if db_device is None: - msg = 'Device({:s}) has not been added to this Device instance.'.format(str(device_uuid)) - raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - - endpoint_id = request.kpi_descriptor.endpoint_id - endpoint_uuid = endpoint_id.endpoint_uuid.uuid - str_endpoint_key = key_to_str([device_uuid, endpoint_uuid]) - endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid - endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid - if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: - str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) - str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') - db_endpoint : EndPointModel = get_object( - self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False) - if db_endpoint is None: - msg = 'Device({:s})/EndPoint({:s}) not found. EndPointKey({:s})'.format( - str(device_uuid), str(endpoint_uuid), str(str_endpoint_key)) - raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - - driver : _Driver = self.driver_instance_cache.get(device_uuid) - if driver is None: - msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid)) - raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - - sample_type = request.kpi_descriptor.kpi_sample_type - - attributes = { - 'kpi_uuid' : request.kpi_id.kpi_id.uuid, - 'kpi_description' : request.kpi_descriptor.kpi_description, - 'kpi_sample_type' : grpc_to_enum__kpi_sample_type(sample_type), - 'device_fk' : db_device, - 'endpoint_fk' : db_endpoint, - 'sampling_duration': request.sampling_duration_s, - 'sampling_interval': request.sampling_interval_s, - } - result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, attributes) - db_kpi, updated = result - - str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)]) - db_endpoint_monitor : EndPointMonitorModel = get_object( - self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False) - if db_endpoint_monitor is None: - msg = 'SampleType({:s}/{:s}) not supported for Device({:s})/EndPoint({:s}).'.format( - str(sample_type), str(KpiSampleType.Name(sample_type).upper().replace('KPISAMPLETYPE_', '')), - str(device_uuid), str(endpoint_uuid)) - raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - - endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key) - str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':') - attributes = { - 'endpoint_monitor_fk': db_endpoint_monitor, - 'kpi_fk' : db_kpi, - } - result : Tuple[EndPointMonitorKpiModel, bool] = update_or_create_object( - self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, attributes) - db_endpoint_monitor_kpi, updated = result - - resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval - resources_to_subscribe.append( - (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval)) - results_subscribestate = driver.SubscribeState(resources_to_subscribe) - errors = check_subscribe_errors(resources_to_subscribe, results_subscribestate) - if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors) - - self.monitoring_loops.add(device_uuid, driver) - - else: - db_kpi : KpiModel = get_object( - self.database, KpiModel, kpi_uuid, raise_if_not_found=False) - if db_kpi is None: - msg = 'Kpi({:s}) not found'.format(str(kpi_uuid)) - raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - - db_device : DeviceModel = get_object( - self.database, DeviceModel, db_kpi.device_fk, raise_if_not_found=False) - if db_device is None: - msg = 'Device({:s}) not found'.format(str(db_kpi.device_fk)) - raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - device_uuid = db_device.device_uuid - - db_endpoint : EndPointModel = get_object( - self.database, EndPointModel, db_kpi.endpoint_fk, raise_if_not_found=False) - if db_endpoint is None: - msg = 'EndPoint({:s}) not found'.format(str(db_kpi.endpoint_fk)) - raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - endpoint_uuid = db_endpoint.endpoint_uuid - str_endpoint_key = db_endpoint.pk - - kpi_sample_type : ORM_KpiSampleTypeEnum = db_kpi.kpi_sample_type - sample_type = kpi_sample_type.value - str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)]) - db_endpoint_monitor : EndPointMonitorModel = get_object( - self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False) - if db_endpoint_monitor is None: - msg = 'EndPointMonitor({:s}) not found.'.format(str(str_endpoint_monitor_key)) - raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - - endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key) - str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':') - db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object( - self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False) - if db_endpoint_monitor_kpi is None: - msg = 'EndPointMonitorKpi({:s}) not found.'.format(str(str_endpoint_monitor_kpi_key)) - raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - - resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval - resources_to_unsubscribe.append( - (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval)) - - driver : _Driver = self.driver_instance_cache.get(device_uuid) - if driver is None: - msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid)) - raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) - - results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe) - errors = check_unsubscribe_errors(resources_to_unsubscribe, results_unsubscribestate) - if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors) - - db_endpoint_monitor_kpi.delete() - db_kpi.delete() - - # There is one monitoring loop per device; keep them active since they are re-used by different monitoring - # requests. - #self.monitoring_loops.remove(device_uuid) - - # Subscriptions are not stored as classical driver config. - # TODO: consider adding it somehow in the configuration. - # Warning: GetConfig might be very slow in OpenConfig devices - #running_config_rules = [ - # (config_rule[0], json.dumps(config_rule[1], sort_keys=True)) - # for config_rule in driver.GetConfig() - #] - #context_config_rules = { - # config_rule[1]: config_rule[2] - # for config_rule in get_config_rules(self.database, device_uuid, 'running') - #} - - ## each in context, not in running => delete in context - ## each in running, not in context => add to context - ## each in context and in running, context.value != running.value => update in context - #running_config_rules_actions : List[Tuple[ORM_ConfigActionEnum, str, str]] = [] - #for config_rule_key,config_rule_value in running_config_rules: - # running_config_rules_actions.append((ORM_ConfigActionEnum.SET, config_rule_key, config_rule_value)) - # context_config_rules.pop(config_rule_key, None) - #for context_rule_key,context_rule_value in context_config_rules.items(): - # running_config_rules_actions.append((ORM_ConfigActionEnum.DELETE, context_rule_key, context_rule_value)) - - ##msg = '[MonitorDeviceKpi] running_config_rules_action[{:d}]: {:s}' - ##for i,running_config_rules_action in enumerate(running_config_rules_actions): - ## LOGGER.info(msg.format(i, str(running_config_rules_action))) - #update_config(self.database, device_uuid, 'running', running_config_rules_actions) - - sync_device_to_context(db_device, self.context_client) + driver : _Driver = self.driver_instance_cache.get(device_uuid) + if driver is None: + msg = ERROR_MISSING_DRIVER.format(str(device_uuid)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + + errors = manage_kpi_method(request, driver, self.monitoring_loops) + if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors) + return Empty() finally: self.mutex_queues.signal_done(device_uuid) diff --git a/src/device/service/MonitoringLoops.py b/src/device/service/MonitoringLoops.py deleted file mode 100644 index 18faed0d51d8d594368a0c80ef03539a9b0c4d4e..0000000000000000000000000000000000000000 --- a/src/device/service/MonitoringLoops.py +++ /dev/null @@ -1,153 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging, queue, re, threading -from datetime import datetime -from typing import Dict -from common.orm.Database import Database -from common.orm.HighLevel import get_object -from common.orm.backend.Tools import key_to_str -from common.proto.monitoring_pb2 import Kpi -from monitoring.client.MonitoringClient import MonitoringClient -from .database.KpiModel import KpiModel -from .database.RelationModels import EndPointMonitorKpiModel -from .driver_api._Driver import _Driver - -LOGGER = logging.getLogger(__name__) -QUEUE_GET_WAIT_TIMEOUT = 0.5 - -class MonitoringLoop: - def __init__(self, device_uuid : str, driver : _Driver, samples_queue : queue.Queue) -> None: - self._device_uuid = device_uuid - self._driver = driver - self._samples_queue = samples_queue - self._running = threading.Event() - self._terminate = threading.Event() - self._samples_stream = self._driver.GetState(blocking=True, terminate=self._terminate) - self._collector_thread = threading.Thread(target=self._collect, daemon=True) - - def _collect(self) -> None: - for sample in self._samples_stream: - if self._terminate.is_set(): break - sample = (self._device_uuid, *sample) - self._samples_queue.put_nowait(sample) - - def start(self): - self._collector_thread.start() - self._running.set() - - @property - def is_running(self): return self._running.is_set() - - def stop(self): - self._terminate.set() - self._collector_thread.join() - -class MonitoringLoops: - def __init__(self, database : Database) -> None: - self._monitoring_client = MonitoringClient() - self._database = database - self._samples_queue = queue.Queue() - self._running = threading.Event() - self._terminate = threading.Event() - self._lock = threading.Lock() - self._device_uuid__to__monitoring_loop : Dict[str, MonitoringLoop] = {} - self._exporter_thread = threading.Thread(target=self._export, daemon=True) - - def add(self, device_uuid : str, driver : _Driver) -> None: - with self._lock: - monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) - if (monitoring_loop is not None) and monitoring_loop.is_running: return - monitoring_loop = MonitoringLoop(device_uuid, driver, self._samples_queue) - self._device_uuid__to__monitoring_loop[device_uuid] = monitoring_loop - monitoring_loop.start() - - def remove(self, device_uuid : str) -> None: - with self._lock: - monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) - if monitoring_loop is None: return - if monitoring_loop.is_running: monitoring_loop.stop() - self._device_uuid__to__monitoring_loop.pop(device_uuid, None) - - def start(self): - self._exporter_thread.start() - - @property - def is_running(self): return self._running.is_set() - - def stop(self): - self._terminate.set() - self._exporter_thread.join() - - def _export(self) -> None: - if self._database is None: - LOGGER.error('[MonitoringLoops:_export] Database not set. Terminating Exporter.') - return - - self._running.set() - while not self._terminate.is_set(): - try: - sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT) - #LOGGER.debug('[MonitoringLoops:_export] sample={:s}'.format(str(sample))) - except queue.Empty: - continue - - device_uuid, timestamp, endpoint_monitor_resource_key, value = sample - endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', endpoint_monitor_resource_key) - str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':') - - #db_entries = self._database.dump() - #LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) - #for db_entry in db_entries: - # LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover - #LOGGER.info('-----------------------------------------------------------') - - db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object( - self._database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False) - if db_endpoint_monitor_kpi is None: - LOGGER.warning('EndPointMonitorKpi({:s}) not found'.format(str_endpoint_monitor_kpi_key)) - continue - - str_kpi_key = db_endpoint_monitor_kpi.kpi_fk - db_kpi : KpiModel = get_object( - self._database, KpiModel, str_kpi_key, raise_if_not_found=False) - if db_kpi is None: - LOGGER.warning('Kpi({:s}) not found'.format(str_kpi_key)) - continue - - # FIXME: uint32 used for intVal results in out of range issues. Temporarily changed to float - # extend the 'kpi_value' to support long integers (uint64 / int64 / ...) - if isinstance(value, int): - kpi_value_field_name = 'int64Val' - kpi_value_field_cast = int - elif isinstance(value, float): - kpi_value_field_name = 'floatVal' - kpi_value_field_cast = float - elif isinstance(value, bool): - kpi_value_field_name = 'boolVal' - kpi_value_field_cast = bool - else: - kpi_value_field_name = 'stringVal' - kpi_value_field_cast = str - - try: - self._monitoring_client.IncludeKpi(Kpi(**{ - 'kpi_id' : {'kpi_id': {'uuid': db_kpi.kpi_uuid}}, - 'timestamp': {'timestamp': timestamp}, - 'kpi_value': {kpi_value_field_name: kpi_value_field_cast(value)} - })) - except: # pylint: disable=bare-except - LOGGER.exception('Unable to format/send Kpi') - - self._running.clear() diff --git a/src/device/service/Tools.py b/src/device/service/Tools.py new file mode 100644 index 0000000000000000000000000000000000000000..086e5a0711e01593c41e061bfdeac965b51850e5 --- /dev/null +++ b/src/device/service/Tools.py @@ -0,0 +1,286 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from typing import Any, Dict, List, Tuple +from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME +from common.method_wrappers.ServiceExceptions import InvalidArgumentException +from common.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig +from common.proto.device_pb2 import MonitoringSettings +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.tools.grpc.Tools import grpc_message_to_json +from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS +from .monitoring.MonitoringLoops import MonitoringLoops + +ERROR_ENDPOINT = 'Device({:s}): GetConfig retrieved malformed Endpoint({:s})' +ERROR_GET = 'Device({:s}): Unable to Get resource(key={:s}); error({:s})' +ERROR_GET_INIT = 'Device({:s}): Unable to Get Initial resource(key={:s}); error({:s})' +ERROR_SET = 'Device({:s}): Unable to Set resource(key={:s}, value={:s}); error({:s})' +ERROR_DELETE = 'Device({:s}): Unable to Delete resource(key={:s}, value={:s}); error({:s})' +ERROR_SAMPLETYPE = 'Device({:s})/EndPoint({:s}): SampleType({:s}/{:s}) not supported' +ERROR_SUBSCRIBE = 'Device({:s}): Unable to Subscribe subscription(key={:s}, duration={:s}, interval={:s}); '+\ + 'error({:s})' +ERROR_MISSING_KPI = 'Device({:s}): Kpi({:s}) not found' +ERROR_UNSUBSCRIBE = 'Device({:s}): Unable to Unsubscribe subscription(key={:s}, duration={:s}, interval={:s}); '+\ + 'error({:s})' + +def check_connect_rules(device_config : DeviceConfig) -> Dict[str, Any]: + connection_config_rules = dict() + unexpected_config_rules = list() + for config_rule in device_config.config_rules: + is_action_set = (config_rule.action == ConfigActionEnum.CONFIGACTION_SET) + is_custom_rule = (config_rule.WhichOneof('config_rule') == 'custom') + if is_action_set and is_custom_rule and (config_rule.custom.resource_key.startswith('_connect/')): + connect_attribute = config_rule.custom.resource_key.replace('_connect/', '') + connection_config_rules[connect_attribute] = config_rule.custom.resource_value + else: + unexpected_config_rules.append(config_rule) + + if len(unexpected_config_rules) > 0: + unexpected_config_rules = grpc_message_to_json(device_config) + unexpected_config_rules = unexpected_config_rules['config_rules'] + unexpected_config_rules = list(filter( + lambda cr: cr.get('custom', {})['resource_key'].replace('_connect/', '') not in connection_config_rules, + unexpected_config_rules)) + str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True) + raise InvalidArgumentException( + 'device.device_config.config_rules', str_unexpected_config_rules, + extra_details='RPC method AddDevice only accepts connection Config Rules that should start '\ + 'with "_connect/" tag. Others should be configured after adding the device.') + + return connection_config_rules + +def get_connect_rules(device_config : DeviceConfig) -> Dict[str, Any]: + connect_rules = dict() + for config_rule in device_config.config_rules: + if config_rule.action != ConfigActionEnum.CONFIGACTION_SET: continue + if config_rule.WhichOneof('config_rule') != 'custom': continue + if not config_rule.custom.resource_key.startswith('_connect/'): continue + connect_attribute = config_rule.custom.resource_key.replace('_connect/', '') + connect_rules[connect_attribute] = config_rule.custom.resource_value + return connect_rules + +def check_no_endpoints(device_endpoints) -> None: + if len(device_endpoints) == 0: return + unexpected_endpoints = [] + for device_endpoint in device_endpoints: + unexpected_endpoints.append(grpc_message_to_json(device_endpoint)) + str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True) + raise InvalidArgumentException( + 'device.device_endpoints', str_unexpected_endpoints, + extra_details='RPC method AddDevice does not accept Endpoints. Endpoints are discovered through '\ + 'interrogation of the physical device.') + +def populate_endpoints(device : Device, driver : _Driver, monitoring_loops : MonitoringLoops) -> List[str]: + device_uuid = device.device_id.device_uuid.uuid + + resources_to_get = [RESOURCE_ENDPOINTS] + results_getconfig = driver.GetConfig(resources_to_get) + + errors : List[str] = list() + for endpoint in results_getconfig: + if len(endpoint) != 2: + errors.append(ERROR_ENDPOINT.format(device_uuid, str(endpoint))) + continue + + resource_key, resource_value = endpoint + if isinstance(resource_value, Exception): + errors.append(ERROR_GET.format(device_uuid, str(resource_key), str(resource_value))) + continue + + endpoint_uuid = resource_value.get('uuid') + + device_endpoint = device.device_endpoints.add() + device_endpoint.topology_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME + device_endpoint.topology_id.topology_uuid.uuid = DEFAULT_TOPOLOGY_NAME + device_endpoint.endpoint_id.device_id.device_uuid.uuid = device_uuid + device_endpoint.endpoint_id.endpoint_uuid.uuid = endpoint_uuid + device_endpoint.endpoint_type = resource_value.get('type') + + sample_types : Dict[int, str] = resource_value.get('sample_types', {}) + for kpi_sample_type, monitor_resource_key in sample_types.items(): + device_endpoint.kpi_sample_types.append(kpi_sample_type) + monitoring_loops.add_resource_key(device_uuid, endpoint_uuid, kpi_sample_type, monitor_resource_key) + + return errors + +def populate_config_rules(device : Device, driver : _Driver) -> List[str]: + device_uuid = device.device_id.device_uuid.uuid + + resources_to_get = ['ALL'] + results_getconfig = driver.GetConfig() + + errors : List[str] = list() + for resource_key, resource_value in zip(resources_to_get, results_getconfig): + if isinstance(resource_value, Exception): + errors.append(ERROR_GET.format(device_uuid, str(resource_key), str(resource_value))) + continue + + config_rule = device.device_config.config_rules.add() + config_rule.action = ConfigActionEnum.CONFIGACTION_SET + config_rule.custom.resource_key = resource_key + config_rule.custom.resource_value = json.dumps(resource_value, sort_keys=True) + + return errors + +def populate_initial_config_rules(device_uuid : str, device_config : DeviceConfig, driver : _Driver) -> List[str]: + results_getinitconfig = driver.GetInitialConfig() + + errors : List[str] = list() + for resource_key, resource_value in results_getinitconfig: + if isinstance(resource_value, Exception): + errors.append(ERROR_GET_INIT.format(device_uuid, str(resource_key), str(resource_value))) + continue + + config_rule = device_config.config_rules.add() + config_rule.action = ConfigActionEnum.CONFIGACTION_SET + config_rule.custom.resource_key = resource_key + config_rule.custom.resource_value = json.dumps(resource_value, sort_keys=True) + + return errors + +def compute_rules_to_add_delete( + device : Device, request : Device +) -> Tuple[List[Tuple[str, Any]], List[Tuple[str, Any]]]: + # convert config rules from context into a dictionary + # TODO: add support for non-custom config rules + context_config_rules = { + config_rule.custom.resource_key: config_rule.custom.resource_value + for config_rule in device.device_config.config_rules + if config_rule.WhichOneof('config_rule') == 'custom' + } + + # convert config rules from request into a list + # TODO: add support for non-custom config rules + request_config_rules = [ + (config_rule.action, config_rule.custom.resource_key, config_rule.custom.resource_value) + for config_rule in request.device_config.config_rules + if config_rule.WhichOneof('config_rule') == 'custom' + ] + + resources_to_set : List[Tuple[str, Any]] = [] # key, value + resources_to_delete : List[Tuple[str, Any]] = [] # key, value + + for action, key, value in request_config_rules: + if action == ConfigActionEnum.CONFIGACTION_SET: + if (key in context_config_rules) and (context_config_rules[key][0] == value): continue + resources_to_set.append((key, value)) + elif action == ConfigActionEnum.CONFIGACTION_DELETE: + if key not in context_config_rules: continue + resources_to_delete.append((key, value)) + + return resources_to_set, resources_to_delete + +def configure_rules(device : Device, driver : _Driver, resources_to_set : List[Tuple[str, Any]]) -> List[str]: + device_uuid = device.device_id.device_uuid.uuid + + results_setconfig = driver.SetConfig(resources_to_set) + + errors : List[str] = list() + for (resource_key, resource_value), result in zip(resources_to_set, results_setconfig): + if isinstance(result, Exception): + errors.append(ERROR_SET.format(device_uuid, str(resource_key), str(resource_value), str(result))) + continue + # add to config of device + config_rule = device.device_config.config_rules.add() + config_rule.action = ConfigActionEnum.CONFIGACTION_SET + config_rule.custom.resource_key = resource_key + config_rule.custom.resource_value = json.dumps(resource_value, sort_keys=True) + + return errors + +def deconfigure_rules(device : Device, driver : _Driver, resources_to_delete : List[Tuple[str, Any]]) -> List[str]: + device_uuid = device.device_id.device_uuid.uuid + + results_deleteconfig = driver.DeleteConfig(resources_to_delete) + + errors : List[str] = list() + for (resource_key, resource_value), result in zip(resources_to_delete, results_deleteconfig): + if isinstance(result, Exception): + errors.append(ERROR_DELETE.format(device_uuid, str(resource_key), str(resource_value), str(result))) + continue + # remove from config of device + config_rule = device.device_config.config_rules.add() + config_rule.action = ConfigActionEnum.CONFIGACTION_SET + config_rule.custom.resource_key = resource_key + config_rule.custom.resource_value = json.dumps(resource_value, sort_keys=True) + + return errors + +def subscribe_kpi(request : MonitoringSettings, driver : _Driver, monitoring_loops : MonitoringLoops) -> List[str]: + kpi_uuid = request.kpi_id.kpi_id.uuid + device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid + endpoint_uuid = request.kpi_descriptor.endpoint_id.endpoint_uuid.uuid + kpi_sample_type = request.kpi_descriptor.kpi_sample_type + + resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_uuid, kpi_sample_type) + if resource_key is None: + kpi_sample_type_name = KpiSampleType.Name(kpi_sample_type).upper().replace('KPISAMPLETYPE_', '') + return [ + ERROR_SAMPLETYPE.format( + str(device_uuid), str(endpoint_uuid), str(kpi_sample_type), str(kpi_sample_type_name) + ) + ] + + sampling_duration = request.sampling_duration_s # seconds + sampling_interval = request.sampling_interval_s # seconds + + resources_to_subscribe = [(resource_key, sampling_duration, sampling_interval)] + results_subscribestate = driver.SubscribeState(resources_to_subscribe) + + errors : List[str] = list() + for (resource_key, duration, interval), result in zip(resources_to_subscribe, results_subscribestate): + if isinstance(result, Exception): + errors.append(ERROR_SUBSCRIBE.format( + str(device_uuid), str(resource_key), str(duration), str(interval), str(result) + )) + continue + + monitoring_loops.add_kpi(device_uuid, resource_key, kpi_uuid, sampling_duration, sampling_interval) + monitoring_loops.add_device(device_uuid, driver) + + return errors + +def unsubscribe_kpi(request : MonitoringSettings, driver : _Driver, monitoring_loops : MonitoringLoops) -> List[str]: + kpi_uuid = request.kpi_id.kpi_id.uuid + device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid + #endpoint_uuid = request.kpi_descriptor.endpoint_id.endpoint_uuid.uuid + #kpi_sample_type = request.kpi_descriptor.kpi_sample_type + + # TODO: consider if further validation needs to be done (correct endpoint_uuid?, correct kpi_sample_type?) + #resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_uuid, kpi_sample_type) + #if resource_key is None: + # kpi_sample_type_name = KpiSampleType.Name(kpi_sample_type).upper().replace('KPISAMPLETYPE_', '') + # return [ERROR_SAMPLETYPE.format(device_uuid, endpoint_uuid, str(kpi_sample_type), str(kpi_sample_type_name))] + + kpi_details = monitoring_loops.get_kpi_by_uuid(kpi_uuid) + if kpi_details is None: + return [ERROR_MISSING_KPI.format(str(device_uuid), str(kpi_uuid))] + + device_uuid, resource_key, sampling_duration, sampling_interval = kpi_details + + resources_to_unsubscribe = [(resource_key, sampling_duration, sampling_interval)] + results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe) + + errors : List[str] = list() + for (resource_key, duration, interval), result in zip(resources_to_unsubscribe, results_unsubscribestate): + if isinstance(result, Exception): + errors.append(ERROR_UNSUBSCRIBE.format( + device_uuid, str(resource_key), str(duration), str(interval), str(result))) + continue + + monitoring_loops.remove_kpi(kpi_uuid) + #monitoring_loops.remove_device(device_uuid) # Do not remove; one monitoring_loop/device used by multiple requests + + return errors diff --git a/src/device/service/__main__.py b/src/device/service/__main__.py index 5c9b41531e7bc579cbe5cc563f20b193f6bc5a90..c69393fc3b9347b7bedfb579b67e79605f14714f 100644 --- a/src/device/service/__main__.py +++ b/src/device/service/__main__.py @@ -20,7 +20,7 @@ from common.Settings import ( wait_for_environment_variables) from .DeviceService import DeviceService from .driver_api.DriverFactory import DriverFactory -from .driver_api.DriverInstanceCache import DriverInstanceCache +from .driver_api.DriverInstanceCache import DriverInstanceCache, preload_drivers from .drivers import DRIVERS terminate = threading.Event() @@ -58,6 +58,9 @@ def main(): driver_factory = DriverFactory(DRIVERS) driver_instance_cache = DriverInstanceCache(driver_factory) + # Initialize drivers with existing devices in context + preload_drivers(driver_instance_cache) + # Starting device service grpc_service = DeviceService(driver_instance_cache) grpc_service.start() diff --git a/src/device/service/database/ConfigModel.py b/src/device/service/database/ConfigModel.py deleted file mode 100644 index 8472a44eaefefceaee36dcbe40d9a427eb2cbb36..0000000000000000000000000000000000000000 --- a/src/device/service/database/ConfigModel.py +++ /dev/null @@ -1,122 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import functools, logging, operator -from enum import Enum -from typing import Dict, List, Tuple, Union -from common.orm.Database import Database -from common.orm.HighLevel import get_object, get_or_create_object, update_or_create_object -from common.orm.backend.Tools import key_to_str -from common.orm.fields.EnumeratedField import EnumeratedField -from common.orm.fields.ForeignKeyField import ForeignKeyField -from common.orm.fields.IntegerField import IntegerField -from common.orm.fields.PrimaryKeyField import PrimaryKeyField -from common.orm.fields.StringField import StringField -from common.orm.model.Model import Model -from common.proto.context_pb2 import ConfigActionEnum -from common.tools.grpc.Tools import grpc_message_to_json_string -from .Tools import fast_hasher, grpc_to_enum, remove_dict_key - -LOGGER = logging.getLogger(__name__) - -class ORM_ConfigActionEnum(Enum): - UNDEFINED = ConfigActionEnum.CONFIGACTION_UNDEFINED - SET = ConfigActionEnum.CONFIGACTION_SET - DELETE = ConfigActionEnum.CONFIGACTION_DELETE - -grpc_to_enum__config_action = functools.partial( - grpc_to_enum, ConfigActionEnum, ORM_ConfigActionEnum) - -class ConfigModel(Model): # pylint: disable=abstract-method - pk = PrimaryKeyField() - - def dump(self) -> List[Dict]: - db_config_rule_pks = self.references(ConfigRuleModel) - config_rules = [ConfigRuleModel(self.database, pk).dump(include_position=True) for pk,_ in db_config_rule_pks] - config_rules = sorted(config_rules, key=operator.itemgetter('position')) - return [remove_dict_key(config_rule, 'position') for config_rule in config_rules] - -class ConfigRuleModel(Model): # pylint: disable=abstract-method - pk = PrimaryKeyField() - config_fk = ForeignKeyField(ConfigModel) - position = IntegerField(min_value=0, required=True) - action = EnumeratedField(ORM_ConfigActionEnum, required=True) - key = StringField(required=True, allow_empty=False) - value = StringField(required=False, allow_empty=True) - - def dump(self, include_position=True) -> Dict: # pylint: disable=arguments-differ - result = { - 'action': self.action.value, - 'custom': { - 'resource_key': self.key, - 'resource_value': self.value, - }, - } - if include_position: result['position'] = self.position - return result - -def delete_all_config_rules(database : Database, db_parent_pk : str, config_name : str) -> None: - str_config_key = key_to_str([db_parent_pk, config_name], separator=':') - db_config : ConfigModel = get_object(database, ConfigModel, str_config_key, raise_if_not_found=False) - if db_config is None: return - db_config_rule_pks = db_config.references(ConfigRuleModel) - for pk,_ in db_config_rule_pks: ConfigRuleModel(database, pk).delete() - -def grpc_config_rules_to_raw(grpc_config_rules) -> List[Tuple[ORM_ConfigActionEnum, str, str]]: - def translate(grpc_config_rule): - action = grpc_to_enum__config_action(grpc_config_rule.action) - config_rule_type = str(grpc_config_rule.WhichOneof('config_rule')) - if config_rule_type != 'custom': - raise NotImplementedError('ConfigRule of type {:s} is not implemented: {:s}'.format( - config_rule_type, grpc_message_to_json_string(grpc_config_rule))) - return action, grpc_config_rule.custom.resource_key, grpc_config_rule.custom.resource_value - return [translate(grpc_config_rule) for grpc_config_rule in grpc_config_rules] - -def get_config_rules( - database : Database, db_parent_pk : str, config_name : str - ) -> List[Tuple[ORM_ConfigActionEnum, str, str]]: - - str_config_key = key_to_str([db_parent_pk, config_name], separator=':') - db_config = get_object(database, ConfigModel, str_config_key, raise_if_not_found=False) - return [] if db_config is None else [ - # pylint: disable=no-member, protected-access - (ORM_ConfigActionEnum._value2member_map_.get(config_rule['action']), - config_rule['custom']['resource_key'], config_rule['custom']['resource_value']) - for config_rule in db_config.dump() - if 'custom' in config_rule - ] - -def update_config( - database : Database, db_parent_pk : str, config_name : str, - raw_config_rules : List[Tuple[ORM_ConfigActionEnum, str, str]] -) -> List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]]: - - str_config_key = key_to_str([db_parent_pk, config_name], separator=':') - result : Tuple[ConfigModel, bool] = get_or_create_object(database, ConfigModel, str_config_key) - db_config, created = result - - db_objects : List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]] = [(db_config, created)] - - for position,(action, resource_key, resource_value) in enumerate(raw_config_rules): - str_rule_key_hash = fast_hasher(resource_key) - str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':') - result : Tuple[ConfigRuleModel, bool] = update_or_create_object( - database, ConfigRuleModel, str_config_rule_key, { - 'config_fk': db_config, 'position': position, 'action': action, 'key': resource_key, - 'value': resource_value, - }) - db_config_rule, updated = result - db_objects.append((db_config_rule, updated)) - - return db_objects diff --git a/src/device/service/database/ContextModel.py b/src/device/service/database/ContextModel.py deleted file mode 100644 index a609e1ba9189f5359064e6628cba6c08d353770e..0000000000000000000000000000000000000000 --- a/src/device/service/database/ContextModel.py +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -from typing import Dict, List -from common.orm.fields.PrimaryKeyField import PrimaryKeyField -from common.orm.fields.StringField import StringField -from common.orm.model.Model import Model - -LOGGER = logging.getLogger(__name__) - -class ContextModel(Model): - pk = PrimaryKeyField() - context_uuid = StringField(required=True, allow_empty=False) - - def dump_id(self) -> Dict: - return {'context_uuid': {'uuid': self.context_uuid}} - - def dump_topology_ids(self) -> List[Dict]: - from .TopologyModel import TopologyModel # pylint: disable=import-outside-toplevel - db_topology_pks = self.references(TopologyModel) - return [TopologyModel(self.database, pk).dump_id() for pk,_ in db_topology_pks] - - def dump(self, include_topologies=False) -> Dict: # pylint: disable=arguments-differ - result = {'context_id': self.dump_id()} - if include_topologies: result['topology_ids'] = self.dump_topology_ids() - return result diff --git a/src/device/service/database/DatabaseTools.py b/src/device/service/database/DatabaseTools.py deleted file mode 100644 index 9d3b712cade921849a5b34be3a837e4f6697b76f..0000000000000000000000000000000000000000 --- a/src/device/service/database/DatabaseTools.py +++ /dev/null @@ -1,127 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import grpc -from typing import Any, Dict, Tuple -from common.method_wrappers.ServiceExceptions import InvalidArgumentException -from common.orm.Database import Database -from common.orm.HighLevel import get_or_create_object, update_or_create_object -from common.orm.backend.Tools import key_to_str -from common.proto.context_pb2 import Device, DeviceId -from context.client.ContextClient import ContextClient -from device.service.driver_api.FilterFields import FilterFieldEnum -from .ConfigModel import delete_all_config_rules, grpc_config_rules_to_raw, update_config -from .ContextModel import ContextModel -from .DeviceModel import DeviceModel, DriverModel, grpc_to_enum__device_operational_status, set_drivers -from .EndPointModel import EndPointModel, set_endpoint_monitors -from .TopologyModel import TopologyModel - -def update_device_in_local_database(database : Database, device : Device) -> Tuple[DeviceModel, bool]: - device_uuid = device.device_id.device_uuid.uuid - - for i,endpoint in enumerate(device.device_endpoints): - endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid - if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid - if device_uuid != endpoint_device_uuid: - raise InvalidArgumentException( - 'request.device_endpoints[{:d}].device_id.device_uuid.uuid'.format(i), endpoint_device_uuid, - ['should be == {:s}({:s})'.format('request.device_id.device_uuid.uuid', device_uuid)]) - - initial_config_result = update_config(database, device_uuid, 'initial', []) - - config_rules = grpc_config_rules_to_raw(device.device_config.config_rules) - delete_all_config_rules(database, device_uuid, 'running') - running_config_result = update_config(database, device_uuid, 'running', config_rules) - - result : Tuple[DeviceModel, bool] = update_or_create_object(database, DeviceModel, device_uuid, { - 'device_uuid' : device_uuid, - 'device_type' : device.device_type, - 'device_operational_status': grpc_to_enum__device_operational_status(device.device_operational_status), - 'device_initial_config_fk' : initial_config_result[0][0], - 'device_running_config_fk' : running_config_result[0][0], - }) - db_device, updated = result - set_drivers(database, db_device, device.device_drivers) - - for i,endpoint in enumerate(device.device_endpoints): - endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid - endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid - if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid - - str_endpoint_key = key_to_str([device_uuid, endpoint_uuid]) - endpoint_attributes = { - 'device_fk' : db_device, - 'endpoint_uuid': endpoint_uuid, - 'endpoint_type': endpoint.endpoint_type, - } - - endpoint_topology_context_uuid = endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid - endpoint_topology_uuid = endpoint.endpoint_id.topology_id.topology_uuid.uuid - if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: - result : Tuple[ContextModel, bool] = get_or_create_object( - database, ContextModel, endpoint_topology_context_uuid, defaults={ - 'context_uuid': endpoint_topology_context_uuid, - }) - db_context, _ = result - - str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) - result : Tuple[TopologyModel, bool] = get_or_create_object( - database, TopologyModel, str_topology_key, defaults={ - 'context_fk': db_context, - 'topology_uuid': endpoint_topology_uuid, - }) - db_topology, _ = result - - str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') - endpoint_attributes['topology_fk'] = db_topology - - result : Tuple[EndPointModel, bool] = update_or_create_object( - database, EndPointModel, str_endpoint_key, endpoint_attributes) - db_endpoint, db_endpoint_updated = result - - set_endpoint_monitors(database, db_endpoint, endpoint.kpi_sample_types) - - updated = updated or db_endpoint_updated - - return db_device, updated - -def sync_device_from_context( - device_uuid : str, context_client : ContextClient, database : Database - ) -> Tuple[DeviceModel, bool]: - - try: - device : Device = context_client.GetDevice(DeviceId(device_uuid={'uuid': device_uuid})) - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member - return None - return update_device_in_local_database(database, device) - -def sync_device_to_context(db_device : DeviceModel, context_client : ContextClient) -> None: - if db_device is None: return - context_client.SetDevice(Device(**db_device.dump( - include_config_rules=True, include_drivers=True, include_endpoints=True))) - -def delete_device_from_context(db_device : DeviceModel, context_client : ContextClient) -> None: - if db_device is None: return - context_client.RemoveDevice(DeviceId(**db_device.dump_id())) - -def get_device_driver_filter_fields(db_device : DeviceModel) -> Dict[FilterFieldEnum, Any]: - if db_device is None: return {} - database = db_device.database - db_driver_pks = db_device.references(DriverModel) - db_driver_names = [DriverModel(database, pk).driver.value for pk,_ in db_driver_pks] - return { - FilterFieldEnum.DEVICE_TYPE: db_device.device_type, - FilterFieldEnum.DRIVER : db_driver_names, - } diff --git a/src/device/service/database/DeviceModel.py b/src/device/service/database/DeviceModel.py deleted file mode 100644 index 9dd63d36efebf135b7bb38845d917bc9e03dc100..0000000000000000000000000000000000000000 --- a/src/device/service/database/DeviceModel.py +++ /dev/null @@ -1,106 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import functools, logging -from enum import Enum -from typing import Dict, List -from common.orm.Database import Database -from common.orm.backend.Tools import key_to_str -from common.orm.fields.EnumeratedField import EnumeratedField -from common.orm.fields.ForeignKeyField import ForeignKeyField -from common.orm.fields.PrimaryKeyField import PrimaryKeyField -from common.orm.fields.StringField import StringField -from common.orm.model.Model import Model -from common.proto.context_pb2 import DeviceDriverEnum, DeviceOperationalStatusEnum -from .ConfigModel import ConfigModel -from .Tools import grpc_to_enum - -LOGGER = logging.getLogger(__name__) - -class ORM_DeviceDriverEnum(Enum): - UNDEFINED = DeviceDriverEnum.DEVICEDRIVER_UNDEFINED - OPENCONFIG = DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG - TRANSPORT_API = DeviceDriverEnum.DEVICEDRIVER_TRANSPORT_API - P4 = DeviceDriverEnum.DEVICEDRIVER_P4 - IETF_NETWORK_TOPOLOGY = DeviceDriverEnum.DEVICEDRIVER_IETF_NETWORK_TOPOLOGY - ONF_TR_352 = DeviceDriverEnum.DEVICEDRIVER_ONF_TR_352 - XR = DeviceDriverEnum.DEVICEDRIVER_XR - -grpc_to_enum__device_driver = functools.partial( - grpc_to_enum, DeviceDriverEnum, ORM_DeviceDriverEnum) - -class ORM_DeviceOperationalStatusEnum(Enum): - UNDEFINED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED - DISABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED - ENABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED - -grpc_to_enum__device_operational_status = functools.partial( - grpc_to_enum, DeviceOperationalStatusEnum, ORM_DeviceOperationalStatusEnum) - -class DeviceModel(Model): - pk = PrimaryKeyField() - device_uuid = StringField(required=True, allow_empty=False) - device_type = StringField() - device_initial_config_fk = ForeignKeyField(ConfigModel) - device_running_config_fk = ForeignKeyField(ConfigModel) - device_operational_status = EnumeratedField(ORM_DeviceOperationalStatusEnum, required=True) - - def dump_id(self) -> Dict: - return {'device_uuid': {'uuid': self.device_uuid}} - - def dump_initial_config(self) -> Dict: - return ConfigModel(self.database, self.device_initial_config_fk).dump() - - def dump_running_config(self) -> Dict: - return ConfigModel(self.database, self.device_running_config_fk).dump() - - def dump_drivers(self) -> List[int]: - db_driver_pks = self.references(DriverModel) - return [DriverModel(self.database, pk).dump() for pk,_ in db_driver_pks] - - def dump_endpoints(self) -> List[Dict]: - from .EndPointModel import EndPointModel # pylint: disable=import-outside-toplevel - db_endpoints_pks = self.references(EndPointModel) - return [EndPointModel(self.database, pk).dump() for pk,_ in db_endpoints_pks] - - def dump( # pylint: disable=arguments-differ - self, include_config_rules=True, include_drivers=True, include_endpoints=True - ) -> Dict: - result = { - 'device_id': self.dump_id(), - 'device_type': self.device_type, - 'device_operational_status': self.device_operational_status.value, - } - if include_config_rules: result.setdefault('device_config', {})['config_rules'] = self.dump_running_config() - if include_drivers: result['device_drivers'] = self.dump_drivers() - if include_endpoints: result['device_endpoints'] = self.dump_endpoints() - return result - -class DriverModel(Model): # pylint: disable=abstract-method - pk = PrimaryKeyField() - device_fk = ForeignKeyField(DeviceModel) - driver = EnumeratedField(ORM_DeviceDriverEnum, required=True) - - def dump(self) -> Dict: - return self.driver.value - -def set_drivers(database : Database, db_device : DeviceModel, grpc_device_drivers): - db_device_pk = db_device.pk - for driver in grpc_device_drivers: - orm_driver = grpc_to_enum__device_driver(driver) - str_device_driver_key = key_to_str([db_device_pk, orm_driver.name]) - db_device_driver = DriverModel(database, str_device_driver_key) - db_device_driver.device_fk = db_device - db_device_driver.driver = orm_driver - db_device_driver.save() diff --git a/src/device/service/database/EndPointModel.py b/src/device/service/database/EndPointModel.py deleted file mode 100644 index 3d4435737349809c527c80546ed412e621afcbdd..0000000000000000000000000000000000000000 --- a/src/device/service/database/EndPointModel.py +++ /dev/null @@ -1,79 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -from typing import Dict, List -from common.orm.Database import Database -from common.orm.HighLevel import update_or_create_object -from common.orm.backend.Tools import key_to_str -from common.orm.fields.EnumeratedField import EnumeratedField -from common.orm.fields.ForeignKeyField import ForeignKeyField -from common.orm.fields.PrimaryKeyField import PrimaryKeyField -from common.orm.fields.StringField import StringField -from common.orm.model.Model import Model -from .DeviceModel import DeviceModel -from .KpiSampleType import ORM_KpiSampleTypeEnum, grpc_to_enum__kpi_sample_type -from .TopologyModel import TopologyModel - -LOGGER = logging.getLogger(__name__) - -class EndPointModel(Model): - pk = PrimaryKeyField() - topology_fk = ForeignKeyField(TopologyModel, required=False) - device_fk = ForeignKeyField(DeviceModel) - endpoint_uuid = StringField(required=True, allow_empty=False) - endpoint_type = StringField() - - def dump_id(self) -> Dict: - device_id = DeviceModel(self.database, self.device_fk).dump_id() - result = { - 'device_id': device_id, - 'endpoint_uuid': {'uuid': self.endpoint_uuid}, - } - if self.topology_fk is not None: - result['topology_id'] = TopologyModel(self.database, self.topology_fk).dump_id() - return result - - def dump_kpi_sample_types(self) -> List[int]: - db_kpi_sample_type_pks = self.references(EndPointMonitorModel) - return [EndPointMonitorModel(self.database, pk).dump() for pk,_ in db_kpi_sample_type_pks] - - def dump( # pylint: disable=arguments-differ - self, include_kpi_sample_types=True - ) -> Dict: - result = { - 'endpoint_id': self.dump_id(), - 'endpoint_type': self.endpoint_type, - } - if include_kpi_sample_types: result['kpi_sample_types'] = self.dump_kpi_sample_types() - return result - -class EndPointMonitorModel(Model): # pylint: disable=abstract-method - pk = PrimaryKeyField() - endpoint_fk = ForeignKeyField(EndPointModel) - resource_key = StringField(required=True, allow_empty=True) - kpi_sample_type = EnumeratedField(ORM_KpiSampleTypeEnum, required=True) - - def dump(self) -> Dict: - return self.kpi_sample_type.value - -def set_endpoint_monitors(database : Database, db_endpoint : EndPointModel, grpc_endpoint_kpi_sample_types): - db_endpoint_pk = db_endpoint.pk - for kpi_sample_type in grpc_endpoint_kpi_sample_types: - orm_kpi_sample_type = grpc_to_enum__kpi_sample_type(kpi_sample_type) - str_endpoint_kpi_sample_type_key = key_to_str([db_endpoint_pk, str(orm_kpi_sample_type.value)]) - update_or_create_object(database, EndPointMonitorModel, str_endpoint_kpi_sample_type_key, { - 'endpoint_fk' : db_endpoint, - 'kpi_sample_type': orm_kpi_sample_type, - }) diff --git a/src/device/service/database/KpiModel.py b/src/device/service/database/KpiModel.py deleted file mode 100644 index e3631d38099c02cd459af7f8393b6991c476bd92..0000000000000000000000000000000000000000 --- a/src/device/service/database/KpiModel.py +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -from typing import Dict -from common.orm.fields.EnumeratedField import EnumeratedField -from common.orm.fields.FloatField import FloatField -from common.orm.fields.ForeignKeyField import ForeignKeyField -from common.orm.fields.PrimaryKeyField import PrimaryKeyField -from common.orm.fields.StringField import StringField -from common.orm.model.Model import Model -from .DeviceModel import DeviceModel -from .EndPointModel import EndPointModel -from .KpiSampleType import ORM_KpiSampleTypeEnum - -LOGGER = logging.getLogger(__name__) - -class KpiModel(Model): - pk = PrimaryKeyField() - kpi_uuid = StringField(required=True, allow_empty=False) - kpi_description = StringField(required=False, allow_empty=True) - kpi_sample_type = EnumeratedField(ORM_KpiSampleTypeEnum, required=True) - device_fk = ForeignKeyField(DeviceModel) - endpoint_fk = ForeignKeyField(EndPointModel) - sampling_duration = FloatField(min_value=0, required=True) - sampling_interval = FloatField(min_value=0, required=True) - - def dump_id(self) -> Dict: - return {'kpi_id': {'uuid': self.kpi_uuid}} - - def dump_descriptor(self) -> Dict: - result = { - 'kpi_description': self.kpi_description, - 'kpi_sample_type': self.kpi_sample_type.value, - } - if self.device_fk is not None: - result['device_id'] = DeviceModel(self.database, self.device_fk).dump_id() - if self.endpoint_fk is not None: - result['endpoint_id'] = EndPointModel(self.database, self.endpoint_fk).dump_id() - return result - - def dump(self) -> Dict: - return { - 'kpi_id': self.dump_id(), - 'kpi_descriptor': self.dump_descriptor(), - 'sampling_duration_s': self.sampling_duration, - 'sampling_interval_s': self.sampling_interval, - } diff --git a/src/device/service/database/KpiSampleType.py b/src/device/service/database/KpiSampleType.py deleted file mode 100644 index 0a2015b3fdeaceeed8b01619805f55f2a9267468..0000000000000000000000000000000000000000 --- a/src/device/service/database/KpiSampleType.py +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import functools -from enum import Enum -from common.proto.kpi_sample_types_pb2 import KpiSampleType -from .Tools import grpc_to_enum - -class ORM_KpiSampleTypeEnum(Enum): - UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN - PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED - PACKETS_RECEIVED = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED - BYTES_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED - BYTES_RECEIVED = KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED - -grpc_to_enum__kpi_sample_type = functools.partial( - grpc_to_enum, KpiSampleType, ORM_KpiSampleTypeEnum) diff --git a/src/device/service/database/Tools.py b/src/device/service/database/Tools.py deleted file mode 100644 index 43bb71bd90582644c67d3ca528611eae937b6460..0000000000000000000000000000000000000000 --- a/src/device/service/database/Tools.py +++ /dev/null @@ -1,72 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import hashlib, re -from enum import Enum -from typing import Dict, List, Tuple, Union - -# Convenient helper function to remove dictionary items in dict/list/set comprehensions. - -def remove_dict_key(dictionary : Dict, key : str): - dictionary.pop(key, None) - return dictionary - -# Enumeration classes are redundant with gRPC classes, but gRPC does not provide a programmatical method to retrieve -# the values it expects from strings containing the desired value symbol or its integer value, so a kind of mapping is -# required. Besides, ORM Models expect Enum classes in EnumeratedFields; we create specific and conveniently defined -# Enum classes to serve both purposes. - -def grpc_to_enum(grpc_enum_class, orm_enum_class : Enum, grpc_enum_value): - grpc_enum_name = grpc_enum_class.Name(grpc_enum_value) - grpc_enum_prefix = orm_enum_class.__name__.upper() - grpc_enum_prefix = re.sub(r'^ORM_(.+)$', r'\1', grpc_enum_prefix) - grpc_enum_prefix = re.sub(r'^(.+)ENUM$', r'\1', grpc_enum_prefix) - grpc_enum_prefix = grpc_enum_prefix + '_' - orm_enum_name = grpc_enum_name.replace(grpc_enum_prefix, '') - orm_enum_value = orm_enum_class._member_map_.get(orm_enum_name) # pylint: disable=protected-access - return orm_enum_value - -# For some models, it is convenient to produce a string hash for fast comparisons of existence or modification. Method -# fast_hasher computes configurable length (between 1 and 64 byte) hashes and retrieves them in hex representation. - -FASTHASHER_ITEM_ACCEPTED_FORMAT = 'Union[bytes, str]' -FASTHASHER_DATA_ACCEPTED_FORMAT = 'Union[{fmt:s}, List[{fmt:s}], Tuple[{fmt:s}]]'.format( - fmt=FASTHASHER_ITEM_ACCEPTED_FORMAT) - -def fast_hasher(data : Union[bytes, str, List[Union[bytes, str]], Tuple[Union[bytes, str]]], digest_size : int = 8): - hasher = hashlib.blake2b(digest_size=digest_size) - # Do not accept sets, dicts, or other unordered dats tructures since their order is arbitrary thus producing - # different hashes depending on the order. Consider adding support for sets or dicts with previous sorting of - # items by their key. - - if isinstance(data, bytes): - data = [data] - elif isinstance(data, str): - data = [data.encode('UTF-8')] - elif isinstance(data, (list, tuple)): - pass - else: - msg = 'data({:s}) must be {:s}, found {:s}' - raise TypeError(msg.format(str(data), FASTHASHER_DATA_ACCEPTED_FORMAT, str(type(data)))) - - for i,item in enumerate(data): - if isinstance(item, str): - item = item.encode('UTF-8') - elif isinstance(item, bytes): - pass - else: - msg = 'data[{:d}]({:s}) must be {:s}, found {:s}' - raise TypeError(msg.format(i, str(item), FASTHASHER_ITEM_ACCEPTED_FORMAT, str(type(item)))) - hasher.update(item) - return hasher.hexdigest() diff --git a/src/device/service/database/TopologyModel.py b/src/device/service/database/TopologyModel.py deleted file mode 100644 index f9e9c0b1a26fdf8faca7e1cbe0a64b582bdd4d5d..0000000000000000000000000000000000000000 --- a/src/device/service/database/TopologyModel.py +++ /dev/null @@ -1,39 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -from typing import Dict -from common.orm.fields.ForeignKeyField import ForeignKeyField -from common.orm.fields.PrimaryKeyField import PrimaryKeyField -from common.orm.fields.StringField import StringField -from common.orm.model.Model import Model -from .ContextModel import ContextModel - -LOGGER = logging.getLogger(__name__) - -class TopologyModel(Model): - pk = PrimaryKeyField() - context_fk = ForeignKeyField(ContextModel) - topology_uuid = StringField(required=True, allow_empty=False) - - def dump_id(self) -> Dict: - context_id = ContextModel(self.database, self.context_fk).dump_id() - return { - 'context_id': context_id, - 'topology_uuid': {'uuid': self.topology_uuid}, - } - - def dump(self) -> Dict: - result = {'topology_id': self.dump_id()} - return result diff --git a/src/device/service/driver_api/DriverInstanceCache.py b/src/device/service/driver_api/DriverInstanceCache.py index 41cc66363885e28082aa353ec46950fbf6ce10e0..29fecf36ff00031de393b30b3d9f6eef3b0c5343 100644 --- a/src/device/service/driver_api/DriverInstanceCache.py +++ b/src/device/service/driver_api/DriverInstanceCache.py @@ -12,12 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, threading +import json, logging, threading from typing import Any, Dict, Optional +from common.method_wrappers.ServiceExceptions import InvalidArgumentException +from common.proto.context_pb2 import Device, Empty +from context.client.ContextClient import ContextClient +from device.service.Tools import get_connect_rules from ._Driver import _Driver from .DriverFactory import DriverFactory from .Exceptions import DriverInstanceCacheTerminatedException -from .FilterFields import FilterFieldEnum +from .FilterFields import FilterFieldEnum, get_device_driver_filter_fields LOGGER = logging.getLogger(__name__) @@ -30,7 +34,8 @@ class DriverInstanceCache: def get( self, device_uuid : str, filter_fields : Dict[FilterFieldEnum, Any] = {}, address : Optional[str] = None, - port : Optional[int] = None, settings : Dict[str, Any] = {}) -> _Driver: + port : Optional[int] = None, settings : Dict[str, Any] = {} + ) -> _Driver: if self._terminate.is_set(): raise DriverInstanceCacheTerminatedException() @@ -61,10 +66,44 @@ class DriverInstanceCache: self._terminate.set() with self._lock: while len(self._device_uuid__to__driver_instance) > 0: + device_uuid,device_driver = self._device_uuid__to__driver_instance.popitem() try: - device_uuid,device_driver = self._device_uuid__to__driver_instance.popitem() device_driver.Disconnect() except: # pylint: disable=bare-except msg = 'Error disconnecting Driver({:s}) from device. Will retry later...' LOGGER.exception(msg.format(device_uuid)) + # re-adding to retry disconnect self._device_uuid__to__driver_instance[device_uuid] = device_driver + +def get_driver(driver_instance_cache : DriverInstanceCache, device : Device) -> _Driver: + device_uuid = device.device_id.device_uuid.uuid + + driver : _Driver = driver_instance_cache.get(device_uuid) + if driver is not None: return driver + + driver_filter_fields = get_device_driver_filter_fields(device) + connect_rules = get_connect_rules(device.device_config) + + #LOGGER.info('[get_driver] connect_rules = {:s}'.format(str(connect_rules))) + address = connect_rules.get('address', '127.0.0.1') + port = connect_rules.get('port', '0') + settings = connect_rules.get('settings', '{}') + + try: + settings = json.loads(settings) + except ValueError as e: + raise InvalidArgumentException( + 'device.device_config.config_rules[settings]', settings, + extra_details='_connect/settings Config Rules provided cannot be decoded as JSON dictionary.' + ) from e + + driver : _Driver = driver_instance_cache.get( + device_uuid, filter_fields=driver_filter_fields, address=address, port=port, settings=settings) + driver.Connect() + + return driver + +def preload_drivers(driver_instance_cache : DriverInstanceCache) -> None: + context_client = ContextClient() + devices = context_client.ListDevices(Empty()) + for device in devices.devices: get_driver(driver_instance_cache, device) diff --git a/src/device/service/driver_api/FilterFields.py b/src/device/service/driver_api/FilterFields.py index 9ea5445903958286d68ff3246e0801e0a7955d2a..ba277e5236d141e170e1d3988403f4a28c623860 100644 --- a/src/device/service/driver_api/FilterFields.py +++ b/src/device/service/driver_api/FilterFields.py @@ -13,8 +13,9 @@ # limitations under the License. from enum import Enum +from typing import Any, Dict, Optional from common.DeviceTypes import DeviceTypeEnum -from device.service.database.DeviceModel import ORM_DeviceDriverEnum +from common.proto.context_pb2 import Device, DeviceDriverEnum class FilterFieldEnum(Enum): DEVICE_TYPE = 'device_type' @@ -26,8 +27,15 @@ class FilterFieldEnum(Enum): # Map allowed filter fields to allowed values per Filter field. If no restriction (free text) None is specified FILTER_FIELD_ALLOWED_VALUES = { FilterFieldEnum.DEVICE_TYPE.value : {i.value for i in DeviceTypeEnum}, - FilterFieldEnum.DRIVER.value : {i.value for i in ORM_DeviceDriverEnum}, + FilterFieldEnum.DRIVER.value : set(DeviceDriverEnum.values()), FilterFieldEnum.VENDOR.value : None, FilterFieldEnum.MODEL.value : None, FilterFieldEnum.SERIAL_NUMBER.value : None, } + +def get_device_driver_filter_fields(device : Optional[Device]) -> Dict[FilterFieldEnum, Any]: + if device is None: return {} + return { + FilterFieldEnum.DEVICE_TYPE: device.device_type, + FilterFieldEnum.DRIVER : [driver for driver in device.device_drivers], + } diff --git a/src/device/service/driver_api/Tools.py b/src/device/service/driver_api/Tools.py deleted file mode 100644 index 19c81d89bfe7e7e1bd46edb205eaf1f2b4bee778..0000000000000000000000000000000000000000 --- a/src/device/service/driver_api/Tools.py +++ /dev/null @@ -1,71 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import operator -from typing import Any, Callable, List, Tuple, Union - -ACTION_MSG_GET = 'Get resource(key={:s})' -ACTION_MSG_SET = 'Set resource(key={:s}, value={:s})' -ACTION_MSG_DELETE = 'Delete resource(key={:s}, value={:s})' -ACTION_MSG_SUBSCRIBE = 'Subscribe subscription(key={:s}, duration={:s}, interval={:s})' -ACTION_MSG_UNSUBSCRIBE = 'Unsubscribe subscription(key={:s}, duration={:s}, interval={:s})' - -def _get(resource_key : str): - return ACTION_MSG_GET.format(str(resource_key)) - -def _set(resource : Tuple[str, Any]): - return ACTION_MSG_SET.format(*tuple(map(str, resource))) - -def _delete(resource : Tuple[str, Any]): - return ACTION_MSG_SET.format(*tuple(map(str, resource))) - -def _subscribe(subscription : Tuple[str, float, float]): - return ACTION_MSG_SUBSCRIBE.format(*tuple(map(str, subscription))) - -def _unsubscribe(subscription : Tuple[str, float, float]): - return ACTION_MSG_UNSUBSCRIBE.format(*tuple(map(str, subscription))) - -def _check_errors( - error_func : Callable, parameters_list : List[Any], results_list : List[Union[bool, Exception]] - ) -> List[str]: - errors = [] - for parameters, results in zip(parameters_list, results_list): - if not isinstance(results, Exception): continue - errors.append('Unable to {:s}; error({:s})'.format(error_func(parameters), str(results))) - return errors - -def check_get_errors( - resource_keys : List[str], results : List[Tuple[str, Union[Any, None, Exception]]] - ) -> List[str]: - return _check_errors(_get, resource_keys, map(operator.itemgetter(1), results)) - -def check_set_errors( - resources : List[Tuple[str, Any]], results : List[Union[bool, Exception]] - ) -> List[str]: - return _check_errors(_set, resources, results) - -def check_delete_errors( - resources : List[Tuple[str, Any]], results : List[Union[bool, Exception]] - ) -> List[str]: - return _check_errors(_delete, resources, results) - -def check_subscribe_errors( - subscriptions : List[Tuple[str, float, float]], results : List[Union[bool, Exception]] - ) -> List[str]: - return _check_errors(_subscribe, subscriptions, results) - -def check_unsubscribe_errors( - subscriptions : List[Tuple[str, float, float]], results : List[Union[bool, Exception]] - ) -> List[str]: - return _check_errors(_unsubscribe, subscriptions, results) diff --git a/src/device/service/database/RelationModels.py b/src/device/service/drivers/emulated/Constants.py similarity index 55% rename from src/device/service/database/RelationModels.py rename to src/device/service/drivers/emulated/Constants.py index 0f6caa646f7548fe0d4aa23829183a132069c589..1c148c02b2a802b75c133f0b0c4ea20438190044 100644 --- a/src/device/service/database/RelationModels.py +++ b/src/device/service/drivers/emulated/Constants.py @@ -12,16 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging -from common.orm.fields.ForeignKeyField import ForeignKeyField -from common.orm.fields.PrimaryKeyField import PrimaryKeyField -from common.orm.model.Model import Model -from .EndPointModel import EndPointMonitorModel -from .KpiModel import KpiModel +from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES -LOGGER = logging.getLogger(__name__) - -class EndPointMonitorKpiModel(Model): # pylint: disable=abstract-method - pk = PrimaryKeyField() - endpoint_monitor_fk = ForeignKeyField(EndPointMonitorModel) - kpi_fk = ForeignKeyField(KpiModel) +SPECIAL_RESOURCE_MAPPINGS = { + RESOURCE_ENDPOINTS : '/endpoints', + RESOURCE_INTERFACES : '/interfaces', + RESOURCE_NETWORK_INSTANCES: '/net-instances', +} diff --git a/src/device/service/drivers/emulated/EmulatedDriver.py b/src/device/service/drivers/emulated/EmulatedDriver.py index 6029ff6604b2525b4509a24a2ec0d6f7c38513d0..4f5effce0a8b6156ce99a73b49b71f157d891286 100644 --- a/src/device/service/drivers/emulated/EmulatedDriver.py +++ b/src/device/service/drivers/emulated/EmulatedDriver.py @@ -12,117 +12,25 @@ # See the License for the specific language governing permissions and # limitations under the License. -import anytree, json, logging, math, pytz, queue, random, re, threading +import anytree, json, logging, pytz, queue, re, threading from datetime import datetime, timedelta -from typing import Any, Dict, Iterator, List, Optional, Tuple, Union +from typing import Any, Iterator, List, Optional, Tuple, Union from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.job import Job from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.schedulers.background import BackgroundScheduler from common.method_wrappers.Decorator import MetricTypeEnum, MetricsPool, metered_subclass_method, INF from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type -from device.service.database.KpiSampleType import ORM_KpiSampleTypeEnum, grpc_to_enum__kpi_sample_type -from device.service.driver_api._Driver import ( - RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES, - _Driver) +from device.service.driver_api._Driver import _Driver from device.service.driver_api.AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value +from .Constants import SPECIAL_RESOURCE_MAPPINGS +from .SyntheticSamplingParameters import SyntheticSamplingParameters, do_sampling +from .Tools import compose_resource_endpoint LOGGER = logging.getLogger(__name__) -SPECIAL_RESOURCE_MAPPINGS = { - RESOURCE_ENDPOINTS : '/endpoints', - RESOURCE_INTERFACES : '/interfaces', - RESOURCE_NETWORK_INSTANCES: '/net-instances', -} - -def compose_resource_endpoint(endpoint_data : Dict[str, Any]) -> Tuple[str, Any]: - endpoint_uuid = endpoint_data.get('uuid') - if endpoint_uuid is None: return None - endpoint_resource_path = SPECIAL_RESOURCE_MAPPINGS.get(RESOURCE_ENDPOINTS) - endpoint_resource_key = '{:s}/endpoint[{:s}]'.format(endpoint_resource_path, endpoint_uuid) - - endpoint_type = endpoint_data.get('type') - if endpoint_type is None: return None - - endpoint_sample_types = endpoint_data.get('sample_types') - if endpoint_sample_types is None: return None - sample_types = {} - for endpoint_sample_type in endpoint_sample_types: - try: - kpi_sample_type : ORM_KpiSampleTypeEnum = grpc_to_enum__kpi_sample_type(endpoint_sample_type) - except: # pylint: disable=bare-except - LOGGER.warning('Unknown EndpointSampleType({:s}) for Endpoint({:s}). Ignoring and continuing...'.format( - str(endpoint_sample_type), str(endpoint_data))) - continue - metric_name = kpi_sample_type.name.lower() - monitoring_resource_key = '{:s}/state/{:s}'.format(endpoint_resource_key, metric_name) - sample_types[endpoint_sample_type] = monitoring_resource_key - - endpoint_resource_value = {'uuid': endpoint_uuid, 'type': endpoint_type, 'sample_types': sample_types} - return endpoint_resource_key, endpoint_resource_value - -RE_GET_ENDPOINT_METRIC = re.compile(r'.*\/endpoint\[([^\]]+)\]\/state\/(.*)') RE_GET_ENDPOINT_FROM_INTERFACE = re.compile(r'.*\/interface\[([^\]]+)\].*') -class SyntheticSamplingParameters: - def __init__(self) -> None: - self.__lock = threading.Lock() - self.__data = {} - self.__configured_endpoints = set() - - def set_endpoint_configured(self, endpoint_uuid : str): - with self.__lock: - self.__configured_endpoints.add(endpoint_uuid) - - def unset_endpoint_configured(self, endpoint_uuid : str): - with self.__lock: - self.__configured_endpoints.discard(endpoint_uuid) - - def get(self, resource_key : str) -> Tuple[float, float, float, float]: - with self.__lock: - match = RE_GET_ENDPOINT_METRIC.match(resource_key) - if match is None: - msg = '[SyntheticSamplingParameters:get] unable to extract endpoint-metric from resource_key "{:s}"' - LOGGER.error(msg.format(resource_key)) - return (0, 0, 1, 0, 0) - endpoint_uuid = match.group(1) - - # If endpoint is not configured, generate a flat synthetic traffic aligned at 0 - if endpoint_uuid not in self.__configured_endpoints: return (0, 0, 1, 0, 0) - - metric = match.group(2) - metric_sense = metric.lower().replace('packets_', '').replace('bytes_', '') - - msg = '[SyntheticSamplingParameters:get] resource_key={:s}, endpoint_uuid={:s}, metric={:s}, metric_sense={:s}' - LOGGER.info(msg.format(resource_key, endpoint_uuid, metric, metric_sense)) - - parameters_key = '{:s}-{:s}'.format(endpoint_uuid, metric_sense) - parameters = self.__data.get(parameters_key) - if parameters is not None: return parameters - - # assume packets - amplitude = 1.e7 * random.random() - phase = 60 * random.random() - period = 3600 * random.random() - offset = 1.e8 * random.random() + amplitude - avg_bytes_per_packet = random.randint(500, 1500) - parameters = (amplitude, phase, period, offset, avg_bytes_per_packet) - return self.__data.setdefault(parameters_key, parameters) - -def do_sampling( - synthetic_sampling_parameters : SyntheticSamplingParameters, resource_key : str, out_samples : queue.Queue - ): - amplitude, phase, period, offset, avg_bytes_per_packet = synthetic_sampling_parameters.get(resource_key) - if 'bytes' in resource_key.lower(): - # convert to bytes - amplitude = avg_bytes_per_packet * amplitude - offset = avg_bytes_per_packet * offset - timestamp = datetime.timestamp(datetime.utcnow()) - waveform = amplitude * math.sin(2 * math.pi * timestamp / period + phase) + offset - noise = amplitude * random.random() - value = abs(0.95 * waveform + 0.05 * noise) - out_samples.put_nowait((timestamp, resource_key, value)) - HISTOGRAM_BUCKETS = ( # .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, INF 0.0001, 0.00025, 0.00050, 0.00075, @@ -240,7 +148,7 @@ class EmulatedDriver(_Driver): try: resource_value = json.loads(resource_value) - except: # pylint: disable=broad-except + except: # pylint: disable=bare-except pass set_subnode_value(resolver, self.__running, resource_path, resource_value) diff --git a/src/device/service/drivers/emulated/SyntheticSamplingParameters.py b/src/device/service/drivers/emulated/SyntheticSamplingParameters.py new file mode 100644 index 0000000000000000000000000000000000000000..65feb9d16e72dd55f8f7ffdf5a2e1bee11f94c81 --- /dev/null +++ b/src/device/service/drivers/emulated/SyntheticSamplingParameters.py @@ -0,0 +1,86 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, math, queue, random, re, threading +from datetime import datetime +from typing import Optional, Tuple + +LOGGER = logging.getLogger(__name__) + +RE_GET_ENDPOINT_METRIC = re.compile(r'.*\/endpoint\[([^\]]+)\]\/state\/(.*)') + +MSG_ERROR_PARSE = '[get] unable to extract endpoint-metric from monitoring_resource_key "{:s}"' +MSG_INFO = '[get] monitoring_resource_key={:s}, endpoint_uuid={:s}, metric={:s}, metric_sense={:s}' + +class SyntheticSamplingParameters: + def __init__(self) -> None: + self.__lock = threading.Lock() + self.__data = {} + self.__configured_endpoints = set() + + def set_endpoint_configured(self, endpoint_uuid : str): + with self.__lock: + self.__configured_endpoints.add(endpoint_uuid) + + def unset_endpoint_configured(self, endpoint_uuid : str): + with self.__lock: + self.__configured_endpoints.discard(endpoint_uuid) + + def get(self, monitoring_resource_key : str) -> Optional[Tuple[float, float, float, float, float]]: + with self.__lock: + match = RE_GET_ENDPOINT_METRIC.match(monitoring_resource_key) + if match is None: + LOGGER.error(MSG_ERROR_PARSE.format(monitoring_resource_key)) + return None + endpoint_uuid = match.group(1) + + # If endpoint is not configured, generate a flat synthetic traffic aligned at 0 + if endpoint_uuid not in self.__configured_endpoints: return (0, 0, 1, 0, 0) + + metric = match.group(2) + metric_sense = metric.lower().replace('packets_', '').replace('bytes_', '') + + LOGGER.info(MSG_INFO.format(monitoring_resource_key, endpoint_uuid, metric, metric_sense)) + + parameters_key = '{:s}-{:s}'.format(endpoint_uuid, metric_sense) + parameters = self.__data.get(parameters_key) + if parameters is not None: return parameters + + # assume packets + amplitude = 1.e7 * random.random() + phase = 60 * random.random() + period = 3600 * random.random() + offset = 1.e8 * random.random() + amplitude + avg_bytes_per_packet = random.randint(500, 1500) + parameters = (amplitude, phase, period, offset, avg_bytes_per_packet) + return self.__data.setdefault(parameters_key, parameters) + +def do_sampling( + synthetic_sampling_parameters : SyntheticSamplingParameters, monitoring_resource_key : str, + out_samples : queue.Queue +) -> None: + parameters = synthetic_sampling_parameters.get(monitoring_resource_key) + if parameters is None: return + amplitude, phase, period, offset, avg_bytes_per_packet = parameters + + if 'bytes' in monitoring_resource_key.lower(): + # convert to bytes + amplitude = avg_bytes_per_packet * amplitude + offset = avg_bytes_per_packet * offset + + timestamp = datetime.timestamp(datetime.utcnow()) + waveform = amplitude * math.sin(2 * math.pi * timestamp / period + phase) + offset + noise = amplitude * random.random() + value = abs(0.95 * waveform + 0.05 * noise) + out_samples.put_nowait((timestamp, monitoring_resource_key, value)) diff --git a/src/device/service/drivers/emulated/Tools.py b/src/device/service/drivers/emulated/Tools.py new file mode 100644 index 0000000000000000000000000000000000000000..14672c203fd86da46c2ac5ddda39860ab67e68db --- /dev/null +++ b/src/device/service/drivers/emulated/Tools.py @@ -0,0 +1,46 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import Any, Dict, Tuple +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from device.service.driver_api._Driver import RESOURCE_ENDPOINTS +from .Constants import SPECIAL_RESOURCE_MAPPINGS + +LOGGER = logging.getLogger(__name__) + +def compose_resource_endpoint(endpoint_data : Dict[str, Any]) -> Tuple[str, Any]: + endpoint_uuid = endpoint_data.get('uuid') + if endpoint_uuid is None: return None + endpoint_resource_path = SPECIAL_RESOURCE_MAPPINGS.get(RESOURCE_ENDPOINTS) + endpoint_resource_key = '{:s}/endpoint[{:s}]'.format(endpoint_resource_path, endpoint_uuid) + + endpoint_type = endpoint_data.get('type') + if endpoint_type is None: return None + + endpoint_sample_types = endpoint_data.get('sample_types') + if endpoint_sample_types is None: return None + + sample_types = {} + for endpoint_sample_type in endpoint_sample_types: + try: + metric_name = KpiSampleType.Name(endpoint_sample_type).lower().replace('kpisampletype_', '') + except: # pylint: disable=bare-except + LOGGER.warning('Unsupported EndPointSampleType({:s})'.format(str(endpoint_sample_type))) + continue + monitoring_resource_key = '{:s}/state/{:s}'.format(endpoint_resource_key, metric_name) + sample_types[endpoint_sample_type] = monitoring_resource_key + + endpoint_resource_value = {'uuid': endpoint_uuid, 'type': endpoint_type, 'sample_types': sample_types} + return endpoint_resource_key, endpoint_resource_value diff --git a/src/device/service/drivers/openconfig/templates/EndPoints.py b/src/device/service/drivers/openconfig/templates/EndPoints.py index 9bd2e75ac4da0965c91b9154046694fd352dc4f6..e831d7738b3a09ae99773e1b882650554cfe5d78 100644 --- a/src/device/service/drivers/openconfig/templates/EndPoints.py +++ b/src/device/service/drivers/openconfig/templates/EndPoints.py @@ -14,7 +14,7 @@ import logging, lxml.etree as ET from typing import Any, Dict, List, Tuple -from device.service.database.KpiSampleType import ORM_KpiSampleTypeEnum +from common.proto.kpi_sample_types_pb2 import KpiSampleType from .Namespace import NAMESPACES from .Tools import add_value_from_collection, add_value_from_tag @@ -47,10 +47,10 @@ def parse(xml_data : ET.Element) -> List[Tuple[str, Dict[str, Any]]]: if 'type' not in endpoint: endpoint['type'] = '-' sample_types = { - ORM_KpiSampleTypeEnum.BYTES_RECEIVED.value : XPATH_IFACE_COUNTER.format(endpoint['uuid'], 'in-octets' ), - ORM_KpiSampleTypeEnum.BYTES_TRANSMITTED.value : XPATH_IFACE_COUNTER.format(endpoint['uuid'], 'out-octets'), - ORM_KpiSampleTypeEnum.PACKETS_RECEIVED.value : XPATH_IFACE_COUNTER.format(endpoint['uuid'], 'in-pkts' ), - ORM_KpiSampleTypeEnum.PACKETS_TRANSMITTED.value: XPATH_IFACE_COUNTER.format(endpoint['uuid'], 'out-pkts' ), + KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED : XPATH_IFACE_COUNTER.format(endpoint['uuid'], 'in-octets' ), + KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED : XPATH_IFACE_COUNTER.format(endpoint['uuid'], 'out-octets'), + KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED : XPATH_IFACE_COUNTER.format(endpoint['uuid'], 'in-pkts' ), + KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED: XPATH_IFACE_COUNTER.format(endpoint['uuid'], 'out-pkts' ), } add_value_from_collection(endpoint, 'sample_types', sample_types) diff --git a/src/device/service/monitoring/MonitoringLoop.py b/src/device/service/monitoring/MonitoringLoop.py new file mode 100644 index 0000000000000000000000000000000000000000..ec17a3ef6763f52831284b8af5bcbed534755525 --- /dev/null +++ b/src/device/service/monitoring/MonitoringLoop.py @@ -0,0 +1,43 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import queue, threading +from device.service.driver_api._Driver import _Driver + +class MonitoringLoop: + def __init__(self, device_uuid : str, driver : _Driver, samples_queue : queue.Queue) -> None: + self._device_uuid = device_uuid + self._driver = driver + self._samples_queue = samples_queue + self._running = threading.Event() + self._terminate = threading.Event() + self._samples_stream = self._driver.GetState(blocking=True, terminate=self._terminate) + self._collector_thread = threading.Thread(target=self._collect, daemon=True) + + def _collect(self) -> None: + for sample in self._samples_stream: + if self._terminate.is_set(): break + sample = (self._device_uuid, *sample) + self._samples_queue.put_nowait(sample) + + def start(self): + self._collector_thread.start() + self._running.set() + + @property + def is_running(self): return self._running.is_set() + + def stop(self): + self._terminate.set() + self._collector_thread.join() diff --git a/src/device/service/monitoring/MonitoringLoops.py b/src/device/service/monitoring/MonitoringLoops.py new file mode 100644 index 0000000000000000000000000000000000000000..5763951fb2075e1975688eda0e49d24e10b0f697 --- /dev/null +++ b/src/device/service/monitoring/MonitoringLoops.py @@ -0,0 +1,170 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, queue, threading +from typing import Dict, Optional, Tuple, Union +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.proto.monitoring_pb2 import Kpi +from monitoring.client.MonitoringClient import MonitoringClient +from ..driver_api._Driver import _Driver +from .MonitoringLoop import MonitoringLoop + +LOGGER = logging.getLogger(__name__) + +QUEUE_GET_WAIT_TIMEOUT = 0.5 + +def value_to_grpc(value : Union[bool, float, int, str]) -> Dict: + if isinstance(value, int): + kpi_value_field_name = 'int64Val' + kpi_value_field_cast = int + elif isinstance(value, float): + kpi_value_field_name = 'floatVal' + kpi_value_field_cast = float + elif isinstance(value, bool): + kpi_value_field_name = 'boolVal' + kpi_value_field_cast = bool + else: + kpi_value_field_name = 'stringVal' + kpi_value_field_cast = str + + return {kpi_value_field_name: kpi_value_field_cast(value)} + +TYPE_TARGET_KEY = Tuple[str, str] # (device_uuid, monitoring_resource_key) +TYPE_TARGET_KPI = Tuple[str, float, float] # (kpi_uuid, sampling_duration, sampling_interval) +TYPE_KPI_DETAIL = Tuple[str, str, float, float] # (device_uuid, monitoring_resource_key, + # sampling_duration, sampling_interval) + +class MonitoringLoops: + def __init__(self) -> None: + self._monitoring_client = MonitoringClient() + self._samples_queue = queue.Queue() + self._running = threading.Event() + self._terminate = threading.Event() + + self._lock_device_endpoint = threading.Lock() + self._device_endpoint_sampletype__to__resource_key : Dict[Tuple[str, str, int], str] = dict() + + self._lock_monitoring_loop = threading.Lock() + self._device_uuid__to__monitoring_loop : Dict[str, MonitoringLoop] = dict() + + self._lock_kpis = threading.Lock() + self._target_to_kpi : Dict[TYPE_TARGET_KEY, TYPE_TARGET_KPI] = dict() + self._kpi_to_detail : Dict[str, TYPE_KPI_DETAIL] = dict() + + self._exporter_thread = threading.Thread(target=self._export, daemon=True) + + def add_device(self, device_uuid : str, driver : _Driver) -> None: + with self._lock_monitoring_loop: + monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) + if (monitoring_loop is not None) and monitoring_loop.is_running: return + monitoring_loop = MonitoringLoop(device_uuid, driver, self._samples_queue) + self._device_uuid__to__monitoring_loop[device_uuid] = monitoring_loop + monitoring_loop.start() + + def remove_device(self, device_uuid : str) -> None: + with self._lock_monitoring_loop: + monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) + if monitoring_loop is None: return + if monitoring_loop.is_running: monitoring_loop.stop() + self._device_uuid__to__monitoring_loop.pop(device_uuid, None) + + def add_resource_key( + self, device_uuid : str, endpoint_uuid : str, kpi_sample_type : KpiSampleType, resource_key : str + ) -> None: + with self._lock_device_endpoint: + key = (device_uuid, endpoint_uuid, kpi_sample_type) + self._device_endpoint_sampletype__to__resource_key[key] = resource_key + + def get_resource_key( + self, device_uuid : str, endpoint_uuid : str, kpi_sample_type : KpiSampleType + ) -> Optional[str]: + with self._lock_device_endpoint: + key = (device_uuid, endpoint_uuid, kpi_sample_type) + return self._device_endpoint_sampletype__to__resource_key.get(key) + + def remove_resource_key( + self, device_uuid : str, endpoint_uuid : str, kpi_sample_type : KpiSampleType + ) -> None: + with self._lock_device_endpoint: + key = (device_uuid, endpoint_uuid, kpi_sample_type) + self._device_endpoint_sampletype__to__resource_key.pop(key, None) + + def add_kpi( + self, device_uuid : str, monitoring_resource_key : str, kpi_uuid : str, sampling_duration : float, + sampling_interval : float + ) -> None: + with self._lock_kpis: + kpi_key = (device_uuid, monitoring_resource_key) + kpi_values = (kpi_uuid, sampling_duration, sampling_interval) + self._target_to_kpi[kpi_key] = kpi_values + + kpi_details = (device_uuid, monitoring_resource_key, sampling_duration, sampling_interval) + self._kpi_to_detail[kpi_uuid] = kpi_details + + def get_kpi_by_uuid(self, kpi_uuid : str) -> Optional[TYPE_KPI_DETAIL]: + with self._lock_kpis: + return self._kpi_to_detail.get(kpi_uuid) + + def get_kpi_by_metric( + self, device_uuid : str, monitoring_resource_key : str + ) -> Optional[TYPE_TARGET_KPI]: + with self._lock_kpis: + kpi_key = (device_uuid, monitoring_resource_key) + return self._target_to_kpi.get(kpi_key) + + def remove_kpi(self, kpi_uuid : str) -> None: + with self._lock_kpis: + kpi_details = self._kpi_to_detail.pop(kpi_uuid, None) + if kpi_details is None: return + kpi_key = kpi_details[0:2] # (device_uuid, monitoring_resource_key, _, _) + self._target_to_kpi.pop(kpi_key, None) + + def start(self): + self._exporter_thread.start() + + @property + def is_running(self): return self._running.is_set() + + def stop(self): + self._terminate.set() + self._exporter_thread.join() + + def _export(self) -> None: + self._running.set() + while not self._terminate.is_set(): + try: + sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT) + #LOGGER.debug('[MonitoringLoops:_export] sample={:s}'.format(str(sample))) + except queue.Empty: + continue + + device_uuid, timestamp, monitoring_resource_key, value = sample + + kpi_details = self.get_kpi_by_metric(device_uuid, monitoring_resource_key) + if kpi_details is None: + MSG = 'Kpi for Device({:s})/MonitoringResourceKey({:s}) not found' + LOGGER.warning(MSG.format(str(device_uuid), str(monitoring_resource_key))) + continue + kpi_uuid,_,_ = kpi_details + + try: + self._monitoring_client.IncludeKpi(Kpi(**{ + 'kpi_id' : {'kpi_id': {'uuid': kpi_uuid}}, + 'timestamp': {'timestamp': timestamp}, + 'kpi_value': value_to_grpc(value), + })) + except: # pylint: disable=bare-except + LOGGER.exception('Unable to format/send Kpi') + + self._running.clear() diff --git a/src/device/service/database/__init__.py b/src/device/service/monitoring/__init__.py similarity index 75% rename from src/device/service/database/__init__.py rename to src/device/service/monitoring/__init__.py index c59423e79961c8503f4469d69c53946988cae24e..70a33251242c51f49140e596b8208a19dd5245f7 100644 --- a/src/device/service/database/__init__.py +++ b/src/device/service/monitoring/__init__.py @@ -12,5 +12,3 @@ # See the License for the specific language governing permissions and # limitations under the License. -# In-Memory database with a simplified representation of Context Database focused on the Device model. -# Used as an internal configuration cache, for message validation, and message formatting purposes.