diff --git a/src/context/service/database/ConfigModel.py b/src/context/service/database/ConfigModel.py index 0fe3484896a39545c4dd49042707dc1ee09fc868..8754da786ae4180378c66cf3191bdeace92878ea 100644 --- a/src/context/service/database/ConfigModel.py +++ b/src/context/service/database/ConfigModel.py @@ -11,7 +11,7 @@ from common.orm.fields.PrimaryKeyField import PrimaryKeyField from common.orm.fields.StringField import StringField from common.orm.model.Model import Model from context.proto.context_pb2 import ConfigActionEnum -from context.service.database.Tools import fast_hasher, grpc_to_enum, remove_dict_key +from .Tools import fast_hasher, grpc_to_enum, remove_dict_key LOGGER = logging.getLogger(__name__) diff --git a/src/context/service/database/EndPointModel.py b/src/context/service/database/EndPointModel.py index b7c220a00b2a85b2c3f4c11a2eceb3aa66aadc5a..38b87d6f37c4e99dd3790f4d8802acd03873f77d 100644 --- a/src/context/service/database/EndPointModel.py +++ b/src/context/service/database/EndPointModel.py @@ -4,8 +4,8 @@ from common.orm.fields.ForeignKeyField import ForeignKeyField from common.orm.fields.PrimaryKeyField import PrimaryKeyField from common.orm.fields.StringField import StringField from common.orm.model.Model import Model -from context.service.database.DeviceModel import DeviceModel -from context.service.database.TopologyModel import TopologyModel +from .DeviceModel import DeviceModel +from .TopologyModel import TopologyModel LOGGER = logging.getLogger(__name__) diff --git a/src/context/service/grpc_server/ContextService.py b/src/context/service/grpc_server/ContextService.py index ab7653e37d318d0bfeea4a60213206d391c0dfda..9f1028dc9eed4d172c49f9b9f916572093c850f0 100644 --- a/src/context/service/grpc_server/ContextService.py +++ b/src/context/service/grpc_server/ContextService.py @@ -29,7 +29,7 @@ class ContextService: self.server = None def start(self): - self.endpoint = '{:s}:{:s}'.format(self.address, str(self.port)) + self.endpoint = '{:s}:{:s}'.format(str(self.address), str(self.port)) LOGGER.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format( str(self.endpoint), str(self.max_workers))) diff --git a/src/device/service/Tools.py b/src/device/_old_code/Tools.py similarity index 100% rename from src/device/service/Tools.py rename to src/device/_old_code/Tools.py diff --git a/src/device/service/DeviceService.py b/src/device/service/DeviceService.py index 29fdc97ce1ba5c8d7266ecdac340dce3e3670425..7773d1ada6825fa60ee9eb995dd54673b641c740 100644 --- a/src/device/service/DeviceService.py +++ b/src/device/service/DeviceService.py @@ -4,17 +4,22 @@ from concurrent import futures from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH from grpc_health.v1.health_pb2 import HealthCheckResponse from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server +from device.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD from device.proto.device_pb2_grpc import add_DeviceServiceServicer_to_server from device.service.DeviceServiceServicerImpl import DeviceServiceServicerImpl -from device.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD +from device.service.data_cache.DataCache import DataCache +from device.service.driver_api.DriverInstanceCache import DriverInstanceCache BIND_ADDRESS = '0.0.0.0' LOGGER = logging.getLogger(__name__) class DeviceService: - def __init__(self, database, address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, - grace_period=GRPC_GRACE_PERIOD): - self.database = database + def __init__( + self, data_cache : DataCache, driver_instance_cache : DriverInstanceCache, + address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD): + + self.data_cache = data_cache + self.driver_instance_cache = driver_instance_cache self.address = address self.port = port self.endpoint = None @@ -26,14 +31,14 @@ class DeviceService: self.server = None def start(self): - self.endpoint = '{}:{}'.format(self.address, self.port) - LOGGER.debug('Starting Service (tentative endpoint: {}, max_workers: {})...'.format( - self.endpoint, self.max_workers)) + self.endpoint = '{:s}:{:s}'.format(str(self.address), str(self.port)) + LOGGER.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format( + str(self.endpoint), str(self.max_workers))) self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers) self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,)) - self.device_servicer = DeviceServiceServicerImpl(self.database) + self.device_servicer = DeviceServiceServicerImpl(self.data_cache, self.driver_instance_cache) add_DeviceServiceServicer_to_server(self.device_servicer, self.server) self.health_servicer = HealthServicer( @@ -41,15 +46,15 @@ class DeviceService: add_HealthServicer_to_server(self.health_servicer, self.server) port = self.server.add_insecure_port(self.endpoint) - self.endpoint = '{}:{}'.format(self.address, port) - LOGGER.info('Listening on {}...'.format(self.endpoint)) + self.endpoint = '{:s}:{:s}'.format(str(self.address), str(port)) + LOGGER.info('Listening on {:s}...'.format(str(self.endpoint))) self.server.start() self.health_servicer.set(OVERALL_HEALTH, HealthCheckResponse.SERVING) # pylint: disable=maybe-no-member LOGGER.debug('Service started') def stop(self): - LOGGER.debug('Stopping service (grace period {} seconds)...'.format(self.grace_period)) + LOGGER.debug('Stopping service (grace period {:s} seconds)...'.format(str(self.grace_period))) self.health_servicer.enter_graceful_shutdown() self.server.stop(self.grace_period) LOGGER.debug('Service stopped') diff --git a/src/device/service/DeviceServiceServicerImpl.py b/src/device/service/DeviceServiceServicerImpl.py index c5893b7387b46d06262a808aea09e870178e7648..7b174851c9f7a76ffe76b725a066158434056ea0 100644 --- a/src/device/service/DeviceServiceServicerImpl.py +++ b/src/device/service/DeviceServiceServicerImpl.py @@ -1,174 +1,95 @@ +from typing import Any, List, Tuple import grpc, logging -from prometheus_client import Counter, Histogram -from common.database.api.context.Constants import DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID -from common.database.api.Database import Database -from common.database.api.context.topology.device.OperationalStatus import OperationalStatus -from common.exceptions.ServiceException import ServiceException -from device.proto.context_pb2 import DeviceId, Device, Empty +from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method +from device.proto.context_pb2 import Device, DeviceConfig, DeviceId, Empty from device.proto.device_pb2_grpc import DeviceServiceServicer -from device.service.Tools import check_device_id_request, check_device_request +from .data_cache.DataCache import DataCache +from .driver_api._Driver import _Driver +from .driver_api.DriverInstanceCache import DriverInstanceCache +from .driver_api.FilterFields import FilterFieldEnum +#from .Tools import check_device_id_request, check_device_request LOGGER = logging.getLogger(__name__) -ADDDEVICE_COUNTER_STARTED = Counter ('device_adddevice_counter_started', - 'Device:AddDevice counter of requests started' ) -ADDDEVICE_COUNTER_COMPLETED = Counter ('device_adddevice_counter_completed', - 'Device:AddDevice counter of requests completed') -ADDDEVICE_COUNTER_FAILED = Counter ('device_adddevice_counter_failed', - 'Device:AddDevice counter of requests failed' ) -ADDDEVICE_HISTOGRAM_DURATION = Histogram('device_adddevice_histogram_duration', - 'Device:AddDevice histogram of request duration') - -CONFIGUREDEVICE_COUNTER_STARTED = Counter ('device_configuredevice_counter_started', - 'Device:ConfigureDevice counter of requests started' ) -CONFIGUREDEVICE_COUNTER_COMPLETED = Counter ('device_configuredevice_counter_completed', - 'Device:ConfigureDevice counter of requests completed') -CONFIGUREDEVICE_COUNTER_FAILED = Counter ('device_configuredevice_counter_failed', - 'Device:ConfigureDevice counter of requests failed' ) -CONFIGUREDEVICE_HISTOGRAM_DURATION = Histogram('device_configuredevice_histogram_duration', - 'Device:ConfigureDevice histogram of request duration') - -DELETEDEVICE_COUNTER_STARTED = Counter ('device_deletedevice_counter_started', - 'Device:DeleteDevice counter of requests started' ) -DELETEDEVICE_COUNTER_COMPLETED = Counter ('device_deletedevice_counter_completed', - 'Device:DeleteDevice counter of requests completed') -DELETEDEVICE_COUNTER_FAILED = Counter ('device_deletedevice_counter_failed', - 'Device:DeleteDevice counter of requests failed' ) -DELETEDEVICE_HISTOGRAM_DURATION = Histogram('device_deletedevice_histogram_duration', - 'Device:DeleteDevice histogram of request duration') +SERVICE_NAME = 'Device' +METHOD_NAMES = ['AddDevice', 'ConfigureDevice', 'DeleteDevice', 'GetInitialConfig'] +METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) class DeviceServiceServicerImpl(DeviceServiceServicer): - def __init__(self, database : Database): + def __init__(self, data_cache : DataCache, driver_instance_cache : DriverInstanceCache): LOGGER.debug('Creating Servicer...') - self.database = database + self.data_cache = data_cache + self.driver_instance_cache = driver_instance_cache LOGGER.debug('Servicer Created') - @ADDDEVICE_HISTOGRAM_DURATION.time() - def AddDevice(self, request : Device, grpc_context : grpc.ServicerContext) -> DeviceId: - ADDDEVICE_COUNTER_STARTED.inc() - try: - LOGGER.debug('AddDevice request: {}'.format(str(request))) - - # ----- Validate request data and pre-conditions ----------------------------------------------------------- - device_id, device_type, device_config, device_opstat, db_endpoints_ports = \ - check_device_request('AddDevice', request, self.database, LOGGER) - - # ----- Implement changes in the database ------------------------------------------------------------------ - db_context = self.database.context(DEFAULT_CONTEXT_ID).create() - db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create() - db_device = db_topology.device(device_id).create(device_type, device_config, device_opstat) - for db_endpoint,port_type in db_endpoints_ports: - db_endpoint.create(port_type) - - # ----- Compose reply -------------------------------------------------------------------------------------- - reply = DeviceId(**db_device.dump_id()) - LOGGER.debug('AddDevice reply: {}'.format(str(reply))) - ADDDEVICE_COUNTER_COMPLETED.inc() - return reply - except ServiceException as e: - LOGGER.exception('AddDevice exception') - ADDDEVICE_COUNTER_FAILED.inc() - grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('AddDevice exception') - ADDDEVICE_COUNTER_FAILED.inc() - grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) - - @CONFIGUREDEVICE_HISTOGRAM_DURATION.time() - def ConfigureDevice(self, request : Device, grpc_context : grpc.ServicerContext) -> DeviceId: - CONFIGUREDEVICE_COUNTER_STARTED.inc() - try: - LOGGER.debug('ConfigureDevice request: {}'.format(str(request))) - - # ----- Validate request data and pre-conditions ----------------------------------------------------------- - device_id, device_type, device_config, device_opstat, db_endpoints_ports = \ - check_device_request('UpdateDevice', request, self.database, LOGGER) - - # ----- Implement changes in the database ------------------------------------------------------------------ - db_context = self.database.context(DEFAULT_CONTEXT_ID).create() - db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create() - db_device = db_topology.device(device_id) - - db_device_attributes = db_device.attributes.get(attributes=['device_type']) - # should not happen, device creation through Database API ensures all fields are always present - if len(db_device_attributes) == 0: # pragma: no cover - msg = 'Attribute device_type for Device({}) does not exist in the database.' # pragma: no cover - msg = msg.format(device_id) # pragma: no cover - raise ServiceException(grpc.StatusCode.FAILED_PRECONDITION, msg) # pragma: no cover - - db_device_type = db_device_attributes.get('device_type') - # should not happen, device creation through Database API ensures all fields are always present - if len(db_device_type) == 0: # pragma: no cover - msg = 'Attribute device_type for Device({}) is empty in the database.' # pragma: no cover - msg = msg.format(device_id) # pragma: no cover - raise ServiceException(grpc.StatusCode.FAILED_PRECONDITION, msg) # pragma: no cover - - if db_device_type != device_type: - msg = 'Device({}) has Type({}) in the database. Cannot be changed to Type({}).' - msg = msg.format(device_id, db_device_type, device_type) - raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg) - - if len(db_endpoints_ports) > 0: - msg = 'Endpoints belonging to Device({}) cannot be modified.' - msg = msg.format(device_id) - raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg) - - update_attributes = {} - - if len(device_config) > 0: - update_attributes['device_config'] = device_config - - if device_opstat != OperationalStatus.KEEP_STATE: - update_attributes['device_operational_status'] = device_opstat - - if len(update_attributes) == 0: - msg = ' '.join([ - 'Any change has been requested for Device({}).', - 'Either specify a new configuration or a new device operational status.', - ]) - msg = msg.format(device_id) - raise ServiceException(grpc.StatusCode.ABORTED, msg) - - db_device.update(update_attributes=update_attributes) - - # ----- Compose reply -------------------------------------------------------------------------------------- - reply = DeviceId(**db_device.dump_id()) - LOGGER.debug('ConfigureDevice reply: {}'.format(str(reply))) - CONFIGUREDEVICE_COUNTER_COMPLETED.inc() - return reply - except ServiceException as e: - LOGGER.exception('ConfigureDevice exception') - CONFIGUREDEVICE_COUNTER_FAILED.inc() - grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('ConfigureDevice exception') - CONFIGUREDEVICE_COUNTER_FAILED.inc() - grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) - - @DELETEDEVICE_HISTOGRAM_DURATION.time() - def DeleteDevice(self, request : DeviceId, grpc_context : grpc.ServicerContext) -> Empty: - DELETEDEVICE_COUNTER_STARTED.inc() - try: - LOGGER.debug('DeleteDevice request: {}'.format(str(request))) - - # ----- Validate request data and pre-conditions ----------------------------------------------------------- - device_id = check_device_id_request('DeleteDevice', request, self.database, LOGGER) - - # ----- Implement changes in the database ------------------------------------------------------------------ - db_context = self.database.context(DEFAULT_CONTEXT_ID).create() - db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create() - db_topology.device(device_id).delete() - - # ----- Compose reply -------------------------------------------------------------------------------------- - reply = Empty() - LOGGER.debug('DeleteDevice reply: {}'.format(str(reply))) - DELETEDEVICE_COUNTER_COMPLETED.inc() - return reply - except ServiceException as e: - LOGGER.exception('DeleteDevice exception') - DELETEDEVICE_COUNTER_FAILED.inc() - grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover - LOGGER.exception('DeleteDevice exception') - DELETEDEVICE_COUNTER_FAILED.inc() - grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) + @safe_and_metered_rpc_method(METRICS, LOGGER) + def AddDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId: + device_id = request.device_id + device_uuid = device_id.device_uuid.uuid + + self.data_cache.sync_device_from_context(device_uuid) + db_device,_ = self.data_cache.set_device(request) + + driver_filter_fields = self.data_cache.get_device_driver_filter_fields(device_uuid) + driver : _Driver = self.driver_instance_cache.get(device_uuid, **driver_filter_fields) + driver.Connect() + + running_config_rules = driver.GetConfig() + self.data_cache.update_device_config_in_local_database(device_uuid, 'running', running_config_rules) + + initial_config_rules = driver.GetInitialConfig() + self.data_cache.update_device_config_in_local_database(device_uuid, 'initial', initial_config_rules) + + self.data_cache.sync_device_to_context(device_uuid) + + return DeviceId(**db_device.dump_id()) + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId: + device_id = request.device_id + device_uuid = device_id.device_uuid.uuid + config_name = 'running' + + self.data_cache.sync_device_from_context(device_uuid) + db_device,_ = self.data_cache.set_device(request) + + # Compute list of changes between device_config in context, and device_config in request + set_changes : List[Tuple[str, Any]] = [] + delete_changes : List[Tuple[str, Any]] = [] + subscriptions : List[Tuple[str, Any]] = [] + unsubscriptions : List[Tuple[str, Any]] = [] + + driver_filter_fields = self.data_cache.get_device_driver_filter_fields(device_uuid) + driver : _Driver = self.driver_instance_cache.get(device_uuid, **driver_filter_fields) + driver.Connect() + + result = driver.SetConfig(set_changes) + # check result + result = driver.DeleteConfig(delete_changes) + # check result + result = driver.SubscribeState(subscriptions) + # check result + result = driver.UnsubscribeState(unsubscriptions) + # check result + + self.data_cache.sync_device_to_context(device_uuid) + return DeviceId(**db_device.dump_id()) + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty: + device_uuid = request.device_uuid.uuid + self.data_cache.sync_device_from_context(device_uuid) + db_device = self.data_cache.get_device(device_uuid) + driver_filter_fields = self.data_cache.get_device_driver_filter_fields(device_uuid) + driver : _Driver = self.driver_instance_cache.get(device_uuid, **driver_filter_fields) + driver.Disconnect() + self.data_cache.delete_device_from_context(device_uuid) + db_device.delete() + return Empty() + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def GetInitialConfig(self, request : DeviceId, context : grpc.ServicerContext) -> DeviceConfig: + device_uuid = request.device_uuid.uuid + self.data_cache.sync_device_from_context(device_uuid) + db_device = self.data_cache.get_device(device_uuid) + return DeviceConfig(device_config={'config_rules': db_device.dump_initial_config()}) diff --git a/src/device/service/__main__.py b/src/device/service/__main__.py index d31512a29cae7e347f71c17a08839ef031037bf0..8c331036dbc3d6d386e8cbffe74c0fe7a275f3b7 100644 --- a/src/device/service/__main__.py +++ b/src/device/service/__main__.py @@ -1,10 +1,12 @@ import logging, signal, sys, threading from prometheus_client import start_http_server from common.Settings import get_setting -#from common.database.Factory import get_database -from context.client.ContextClient import ContextClient -from device.service.DeviceService import DeviceService from device.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, METRICS_PORT +from device.service.DeviceService import DeviceService +from device.service.data_cache.DataCache import DataCache +from device.service.driver_api.DriverFactory import DriverFactory +from device.service.driver_api.DriverInstanceCache import DriverInstanceCache +from device.service.drivers import DRIVERS terminate = threading.Event() LOGGER = None @@ -16,11 +18,13 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name def main(): global LOGGER # pylint: disable=global-statement - service_port = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT) - max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS ) - grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD) - log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL ) - metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT ) + grpc_service_port = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT) + max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS ) + grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD) + log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL ) + metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT ) + context_service_host = get_setting('CONTEXTSERVICE_SERVICE_HOST', default=None ) + context_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC', default=None ) logging.basicConfig(level=log_level) LOGGER = logging.getLogger(__name__) @@ -33,17 +37,16 @@ def main(): # Start metrics server start_http_server(metrics_port) - # Get context client instance - context_service_host = get_setting('CONTEXTSERVICE_SERVICE_HOST', default=None) - context_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC', default=None) - if context_service_host is None or context_service_port is None: - raise Exception('Wrong address({:s}):port({:s}) of Context component'.format( - str(context_service_host), str(context_service_port))) - context_client = ContextClient(context_service_host, context_service_port) - #database = get_database() + # Initialize DataCache + data_cache = DataCache(context_service_host, context_service_port) + + # Initialize Driver framework + driver_factory = DriverFactory(DRIVERS) + driver_instance_cache = DriverInstanceCache(driver_factory) # Starting device service - grpc_service = DeviceService(context_client, port=service_port, max_workers=max_workers, grace_period=grace_period) + grpc_service = DeviceService( + data_cache, driver_instance_cache, port=grpc_service_port, max_workers=max_workers, grace_period=grace_period) grpc_service.start() # Wait for Ctrl+C or termination signal @@ -51,6 +54,7 @@ def main(): LOGGER.info('Terminating...') grpc_service.stop() + driver_instance_cache.terminate() LOGGER.info('Bye') return 0 diff --git a/src/device/service/data_cache/DataCache.py b/src/device/service/data_cache/DataCache.py new file mode 100644 index 0000000000000000000000000000000000000000..1468eb848f03076f7356772b1be1b21f849ef335 --- /dev/null +++ b/src/device/service/data_cache/DataCache.py @@ -0,0 +1,58 @@ +import grpc, logging +from typing import Any, Dict, List, Optional, Tuple, Union +from common.orm.Database import Database +from common.orm.Factory import get_database_backend +from common.orm.HighLevel import get_object +from common.orm.backend.BackendEnum import BackendEnum +from context.client.ContextClient import ContextClient +from device.proto.context_pb2 import Device, DeviceId +from database.ConfigModel import set_config +from device.service.driver_api.FilterFields import FilterFieldEnum +from .database.DeviceModel import DeviceModel, DriverModel +from .DeviceTools import update_device_config_in_local_database, update_device_in_local_database + +LOGGER = logging.getLogger(__name__) + +class DataCache: + def __init__(self, context_address : str, context_port : Union[str, int]) -> None: + if context_address is None or context_port is None: + raise Exception('Wrong address({:s}):port({:s}) of Context component'.format( + str(context_address), str(context_port))) + self._context_client = ContextClient(context_address, context_port) + self._database = Database(get_database_backend(backend=BackendEnum.INMEMORY)) + + def get_device(self, device_uuid : str) -> Optional[DeviceModel]: + return get_object(self._database, DeviceModel, device_uuid, raise_if_not_found=False) + + def get_device_driver_filter_fields(self, device_uuid : str) -> Dict[FilterFieldEnum, Any]: + db_device = self.get_device(device_uuid) + db_driver_pks = db_device.references(DriverModel) + db_driver_names = [DriverModel(self._database, pk).driver.value for pk,_ in db_driver_pks] + return { + FilterFieldEnum.DEVICE_TYPE: db_device.device_type, + FilterFieldEnum.DRIVER : db_driver_names, + } + + def set_device(self, device : Device) -> Tuple[DeviceModel, bool]: + return update_device_in_local_database(self._database, device) + + def sync_device_from_context(self, device_uuid : str) -> bool: + try: + device : Device = self._context_client.GetDevice(DeviceId(device_uuid={'uuid': device_uuid})) + except grpc.RpcError as e: + if e.value.code() != grpc.StatusCode.NOT_FOUND: raise + return None + return update_device_in_local_database(self._database, device) + + def sync_device_to_context(self, device_uuid : str): + db_device = get_object(self._database, DeviceModel, device_uuid, raise_if_not_found=False) + self._context_client.SetDevice(Device(**db_device.dump( + include_config_rules=True, include_drivers=True, include_endpoints=True))) + + def delete_device_from_context(self, device_uuid : str): + db_device = get_object(self._database, DeviceModel, device_uuid, raise_if_not_found=False) + self._context_client.RemoveDevice(DeviceId(**db_device.dump_id())) + + def update_device_config_in_local_database( + self, device_uuid : str, config_name : str, config_rules : List[Tuple[str, Any]]): + update_device_config_in_local_database(self._database, device_uuid, config_name, config_rules) diff --git a/src/device/service/data_cache/DeviceTools.py b/src/device/service/data_cache/DeviceTools.py new file mode 100644 index 0000000000000000000000000000000000000000..ce617f3ecef37a3b62ae1562f18c8a88802720e2 --- /dev/null +++ b/src/device/service/data_cache/DeviceTools.py @@ -0,0 +1,88 @@ +import logging, operator +from typing import Any, List, Tuple +from common.orm.Database import Database +from common.orm.HighLevel import get_object, get_or_create_object, update_or_create_object +from common.orm.backend.Tools import key_to_str +from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException +from context.proto.context_pb2 import ConfigActionEnum +from device.proto.context_pb2 import Device +from device.service.data_cache.database.Tools import fast_hasher, remove_dict_key +from .database.ConfigModel import ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, set_config +from .database.ContextModel import ContextModel +from .database.DeviceModel import DeviceModel, grpc_to_enum__device_operational_status, set_drivers +from .database.EndPointModel import EndPointModel +from .database.TopologyModel import TopologyModel + +LOGGER = logging.getLogger(__name__) + +def update_device_in_local_database(database : Database, device : Device) -> Tuple[DeviceModel, bool]: + device_uuid = device.device_id.device_uuid.uuid + + for i,endpoint in enumerate(device.device_endpoints): + endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid + if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid + if device_uuid != endpoint_device_uuid: + raise InvalidArgumentException( + 'request.device_endpoints[{:d}].device_id.device_uuid.uuid'.format(i), endpoint_device_uuid, + ['should be == {:s}({:s})'.format('request.device_id.device_uuid.uuid', device_uuid)]) + + running_config_result = set_config(database, device_uuid, 'running', device.device_config.config_rules) + result : Tuple[DeviceModel, bool] = update_or_create_object(database, DeviceModel, device_uuid, { + 'device_uuid' : device_uuid, + 'device_type' : device.device_type, + 'device_operational_status': grpc_to_enum__device_operational_status(device.device_operational_status), + 'device_config_fk' : running_config_result[0][0], + }) + db_device, updated = result + set_drivers(database, db_device, device.device_drivers) + + for i,endpoint in enumerate(device.device_endpoints): + endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid + endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid + if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid + + str_endpoint_key = key_to_str([device_uuid, endpoint_uuid]) + endpoint_attributes = { + 'device_fk' : db_device, + 'endpoint_uuid': endpoint_uuid, + 'endpoint_type': endpoint.endpoint_type, + } + + endpoint_topology_context_uuid = endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid + endpoint_topology_uuid = endpoint.endpoint_id.topology_id.topology_uuid.uuid + if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: + db_context : ContextModel = get_or_create_object(database, ContextModel, endpoint_topology_context_uuid) + + str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) + db_topology : TopologyModel = get_or_create_object(database, TopologyModel, str_topology_key, defaults={ + 'context_fk': db_context, + }) + + str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') + endpoint_attributes['topology_fk'] = db_topology + + result : Tuple[EndPointModel, bool] = update_or_create_object( + database, EndPointModel, str_endpoint_key, endpoint_attributes) + #db_endpoint, updated = result + + return db_device + +def update_device_config_in_local_database( + database : Database, device_uuid : str, config_name : str, config_rules : List[Tuple[str, Any]]): + + str_config_key = key_to_str([device_uuid, config_name], separator=':') + db_config = get_object(database, ConfigModel, str_config_key) + db_config_rule_pks = db_config.references(ConfigRuleModel) + for pk,_ in db_config_rule_pks: ConfigRuleModel(database, pk).delete() + + for position,config_rule in enumerate(config_rules): + config_rule_resource_key, config_rule_resource_value = config_rule + str_rule_key_hash = fast_hasher(config_rule_resource_key) + str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':') + update_or_create_object(database, ConfigRuleModel, str_config_rule_key, { + 'config_fk': db_config, + 'position' : position, + 'action' : ORM_ConfigActionEnum.SET, + 'key' : config_rule_resource_key, + 'value' : config_rule_resource_value, + }) diff --git a/src/device/service/data_cache/__init__.py b/src/device/service/data_cache/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/device/service/data_cache/database/ConfigModel.py b/src/device/service/data_cache/database/ConfigModel.py new file mode 100644 index 0000000000000000000000000000000000000000..73fc06eb57c943ce456b7c4ded7cb5f37a28fef1 --- /dev/null +++ b/src/device/service/data_cache/database/ConfigModel.py @@ -0,0 +1,84 @@ +import functools, logging, operator +from enum import Enum +from typing import Dict, List, Tuple, Union +from common.orm.Database import Database +from common.orm.HighLevel import get_or_create_object, update_or_create_object +from common.orm.backend.Tools import key_to_str +from common.orm.fields.EnumeratedField import EnumeratedField +from common.orm.fields.ForeignKeyField import ForeignKeyField +from common.orm.fields.IntegerField import IntegerField +from common.orm.fields.PrimaryKeyField import PrimaryKeyField +from common.orm.fields.StringField import StringField +from common.orm.model.Model import Model +from device.proto.context_pb2 import ConfigActionEnum +from .Tools import fast_hasher, grpc_to_enum, remove_dict_key + +LOGGER = logging.getLogger(__name__) + +class ORM_ConfigActionEnum(Enum): + UNDEFINED = ConfigActionEnum.CONFIGACTION_UNDEFINED + SET = ConfigActionEnum.CONFIGACTION_SET + DELETE = ConfigActionEnum.CONFIGACTION_DELETE + +grpc_to_enum__config_action = functools.partial( + grpc_to_enum, ConfigActionEnum, ORM_ConfigActionEnum) + +class ConfigModel(Model): # pylint: disable=abstract-method + pk = PrimaryKeyField() + + def dump(self) -> List[Dict]: + db_config_rule_pks = self.references(ConfigRuleModel) + config_rules = [ConfigRuleModel(self.database, pk).dump(include_position=True) for pk,_ in db_config_rule_pks] + config_rules = sorted(config_rules, key=operator.itemgetter('position')) + return [remove_dict_key(config_rule, 'position') for config_rule in config_rules] + +class ConfigRuleModel(Model): # pylint: disable=abstract-method + pk = PrimaryKeyField() + config_fk = ForeignKeyField(ConfigModel) + position = IntegerField(min_value=0, required=True) + action = EnumeratedField(ORM_ConfigActionEnum, required=True) + key = StringField(required=True, allow_empty=False) + value = StringField(required=True, allow_empty=False) + + def dump(self, include_position=True) -> Dict: # pylint: disable=arguments-differ + result = { + 'action': self.action.value, + 'resource_key': self.key, + 'resource_value': self.value, + } + if include_position: result['position'] = self.position + return result + +def set_config_rule( + database : Database, db_config : ConfigModel, grpc_config_rule, position : int + ) -> Tuple[ConfigRuleModel, bool]: + + str_rule_key_hash = fast_hasher(grpc_config_rule.resource_key) + str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':') + + result : Tuple[ConfigRuleModel, bool] = update_or_create_object(database, ConfigRuleModel, str_config_rule_key, { + 'config_fk': db_config, + 'position' : position, + 'action' : grpc_to_enum__config_action(grpc_config_rule.action), + 'key' : grpc_config_rule.resource_key, + 'value' : grpc_config_rule.resource_value, + }) + db_config_rule, updated = result + return db_config_rule, updated + +def set_config( + database : Database, db_parent_pk : str, config_name : str, grpc_config_rules + ) -> List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]]: + + str_config_key = key_to_str([db_parent_pk, config_name], separator=':') + result : Tuple[ConfigModel, bool] = get_or_create_object(database, ConfigModel, str_config_key) + db_config, created = result + + db_objects = [(db_config, created)] + + for position,grpc_config_rule in enumerate(grpc_config_rules): + result : Tuple[ConfigRuleModel, bool] = set_config_rule(database, db_config, grpc_config_rule, position) + db_config_rule, updated = result + db_objects.append((db_config_rule, updated)) + + return db_objects diff --git a/src/device/service/data_cache/database/ContextModel.py b/src/device/service/data_cache/database/ContextModel.py new file mode 100644 index 0000000000000000000000000000000000000000..7dddef350df2fb39838e626559e3b3e00337f93b --- /dev/null +++ b/src/device/service/data_cache/database/ContextModel.py @@ -0,0 +1,24 @@ +import logging +from typing import Dict, List +from common.orm.fields.PrimaryKeyField import PrimaryKeyField +from common.orm.fields.StringField import StringField +from common.orm.model.Model import Model + +LOGGER = logging.getLogger(__name__) + +class ContextModel(Model): + pk = PrimaryKeyField() + context_uuid = StringField(required=True, allow_empty=False) + + def dump_id(self) -> Dict: + return {'context_uuid': {'uuid': self.context_uuid}} + + def dump_topology_ids(self) -> List[Dict]: + from .TopologyModel import TopologyModel # pylint: disable=import-outside-toplevel + db_topology_pks = self.references(TopologyModel) + return [TopologyModel(self.database, pk).dump_id() for pk,_ in db_topology_pks] + + def dump(self, include_topologies=True) -> Dict: # pylint: disable=arguments-differ + result = {'context_id': self.dump_id()} + if include_topologies: result['topology_ids'] = self.dump_topology_ids() + return result diff --git a/src/device/service/data_cache/database/DeviceModel.py b/src/device/service/data_cache/database/DeviceModel.py new file mode 100644 index 0000000000000000000000000000000000000000..bba19d787622019b7b8f25de9c07b7c0984ec42c --- /dev/null +++ b/src/device/service/data_cache/database/DeviceModel.py @@ -0,0 +1,91 @@ +import functools, logging +from enum import Enum +from typing import Dict, List +from common.orm.Database import Database +from common.orm.backend.Tools import key_to_str +from common.orm.fields.EnumeratedField import EnumeratedField +from common.orm.fields.ForeignKeyField import ForeignKeyField +from common.orm.fields.PrimaryKeyField import PrimaryKeyField +from common.orm.fields.StringField import StringField +from common.orm.model.Model import Model +from device.proto.context_pb2 import DeviceDriverEnum, DeviceOperationalStatusEnum +from .ConfigModel import ConfigModel +from .Tools import grpc_to_enum + +LOGGER = logging.getLogger(__name__) + +class ORM_DeviceDriverEnum(Enum): + UNDEFINED = DeviceDriverEnum.DEVICEDRIVER_UNDEFINED + OPENCONFIG = DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG + TRANSPORT_API = DeviceDriverEnum.DEVICEDRIVER_TRANSPORT_API + P4 = DeviceDriverEnum.DEVICEDRIVER_P4 + IETF_NETWORK_TOPOLOGY = DeviceDriverEnum.DEVICEDRIVER_IETF_NETWORK_TOPOLOGY + ONF_TR_352 = DeviceDriverEnum.DEVICEDRIVER_ONF_TR_352 + +grpc_to_enum__device_driver = functools.partial( + grpc_to_enum, DeviceDriverEnum, ORM_DeviceDriverEnum) + +class ORM_DeviceOperationalStatusEnum(Enum): + UNDEFINED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED + DISABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED + ENABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED + +grpc_to_enum__device_operational_status = functools.partial( + grpc_to_enum, DeviceOperationalStatusEnum, ORM_DeviceOperationalStatusEnum) + +class DeviceModel(Model): + pk = PrimaryKeyField() + device_uuid = StringField(required=True, allow_empty=False) + device_type = StringField() + device_initial_config_fk = ForeignKeyField(ConfigModel) + device_running_config_fk = ForeignKeyField(ConfigModel) + device_operational_status = EnumeratedField(ORM_DeviceOperationalStatusEnum, required=True) + + def dump_id(self) -> Dict: + return {'device_uuid': {'uuid': self.device_uuid}} + + def dump_initial_config(self) -> Dict: + return ConfigModel(self.database, self.device_initial_config_fk).dump() + + def dump_running_config(self) -> Dict: + return ConfigModel(self.database, self.device_running_config_fk).dump() + + def dump_drivers(self) -> List[int]: + db_driver_pks = self.references(DriverModel) + return [DriverModel(self.database, pk).dump() for pk,_ in db_driver_pks] + + def dump_endpoints(self) -> List[Dict]: + from .EndPointModel import EndPointModel # pylint: disable=import-outside-toplevel + db_endpoints_pks = self.references(EndPointModel) + return [EndPointModel(self.database, pk).dump() for pk,_ in db_endpoints_pks] + + def dump( # pylint: disable=arguments-differ + self, include_config_rules=True, include_drivers=True, include_endpoints=True + ) -> Dict: + result = { + 'device_id': self.dump_id(), + 'device_type': self.device_type, + 'device_operational_status': self.device_operational_status.value, + } + if include_config_rules: result.setdefault('device_config', {})['config_rules'] = self.dump_running_config() + if include_drivers: result['device_drivers'] = self.dump_drivers() + if include_endpoints: result['device_endpoints'] = self.dump_endpoints() + return result + +class DriverModel(Model): # pylint: disable=abstract-method + pk = PrimaryKeyField() + device_fk = ForeignKeyField(DeviceModel) + driver = EnumeratedField(ORM_DeviceDriverEnum, required=True) + + def dump(self) -> Dict: + return self.driver.value + +def set_drivers(database : Database, db_device : DeviceModel, grpc_device_drivers): + db_device_pk = db_device.pk + for driver in grpc_device_drivers: + orm_driver = grpc_to_enum__device_driver(driver) + str_device_driver_key = key_to_str([db_device_pk, orm_driver.name]) + db_device_driver = DriverModel(database, str_device_driver_key) + db_device_driver.device_fk = db_device + db_device_driver.driver = orm_driver + db_device_driver.save() diff --git a/src/device/service/data_cache/database/EndPointModel.py b/src/device/service/data_cache/database/EndPointModel.py new file mode 100644 index 0000000000000000000000000000000000000000..38b87d6f37c4e99dd3790f4d8802acd03873f77d --- /dev/null +++ b/src/device/service/data_cache/database/EndPointModel.py @@ -0,0 +1,33 @@ +import logging +from typing import Dict +from common.orm.fields.ForeignKeyField import ForeignKeyField +from common.orm.fields.PrimaryKeyField import PrimaryKeyField +from common.orm.fields.StringField import StringField +from common.orm.model.Model import Model +from .DeviceModel import DeviceModel +from .TopologyModel import TopologyModel + +LOGGER = logging.getLogger(__name__) + +class EndPointModel(Model): + pk = PrimaryKeyField() + topology_fk = ForeignKeyField(TopologyModel, required=False) + device_fk = ForeignKeyField(DeviceModel) + endpoint_uuid = StringField(required=True, allow_empty=False) + endpoint_type = StringField() + + def dump_id(self) -> Dict: + device_id = DeviceModel(self.database, self.device_fk).dump_id() + result = { + 'device_id': device_id, + 'endpoint_uuid': {'uuid': self.endpoint_uuid}, + } + if self.topology_fk is not None: + result['topology_id'] = TopologyModel(self.database, self.topology_fk).dump_id() + return result + + def dump(self) -> Dict: + return { + 'endpoint_id': self.dump_id(), + 'endpoint_type': self.endpoint_type, + } diff --git a/src/device/service/data_cache/database/Tools.py b/src/device/service/data_cache/database/Tools.py new file mode 100644 index 0000000000000000000000000000000000000000..36ffbcd46fcf686371b0799445ce4f9ce5b75838 --- /dev/null +++ b/src/device/service/data_cache/database/Tools.py @@ -0,0 +1,58 @@ +import hashlib, re +from enum import Enum +from typing import Dict, List, Tuple, Union + +# Convenient helper function to remove dictionary items in dict/list/set comprehensions. + +def remove_dict_key(dictionary : Dict, key : str): + dictionary.pop(key, None) + return dictionary + +# Enumeration classes are redundant with gRPC classes, but gRPC does not provide a programmatical method to retrieve +# the values it expects from strings containing the desired value symbol or its integer value, so a kind of mapping is +# required. Besides, ORM Models expect Enum classes in EnumeratedFields; we create specific and conveniently defined +# Enum classes to serve both purposes. + +def grpc_to_enum(grpc_enum_class, orm_enum_class : Enum, grpc_enum_value): + grpc_enum_name = grpc_enum_class.Name(grpc_enum_value) + grpc_enum_prefix = orm_enum_class.__name__.upper() + grpc_enum_prefix = re.sub(r'^ORM_(.+)$', r'\1', grpc_enum_prefix) + grpc_enum_prefix = re.sub(r'^(.+)ENUM$', r'\1', grpc_enum_prefix) + grpc_enum_prefix = grpc_enum_prefix + '_' + orm_enum_name = grpc_enum_name.replace(grpc_enum_prefix, '') + orm_enum_value = orm_enum_class._member_map_.get(orm_enum_name) # pylint: disable=protected-access + return orm_enum_value + +# For some models, it is convenient to produce a string hash for fast comparisons of existence or modification. Method +# fast_hasher computes configurable length (between 1 and 64 byte) hashes and retrieves them in hex representation. + +FASTHASHER_ITEM_ACCEPTED_FORMAT = 'Union[bytes, str]' +FASTHASHER_DATA_ACCEPTED_FORMAT = 'Union[{fmt:s}, List[{fmt:s}], Tuple[{fmt:s}]]'.format( + fmt=FASTHASHER_ITEM_ACCEPTED_FORMAT) + +def fast_hasher(data : Union[bytes, str, List[Union[bytes, str]], Tuple[Union[bytes, str]]], digest_size : int = 8): + hasher = hashlib.blake2b(digest_size=digest_size) + # Do not accept sets, dicts, or other unordered dats tructures since their order is arbitrary thus producing + # different hashes depending on the order. Consider adding support for sets or dicts with previous sorting of + # items by their key. + + if isinstance(data, bytes): + data = [data] + elif isinstance(data, str): + data = [data.encode('UTF-8')] + elif isinstance(data, (list, tuple)): + pass + else: + msg = 'data({:s}) must be {:s}, found {:s}' + raise TypeError(msg.format(str(data), FASTHASHER_DATA_ACCEPTED_FORMAT, str(type(data)))) + + for i,item in enumerate(data): + if isinstance(item, str): + item = item.encode('UTF-8') + elif isinstance(item, bytes): + pass + else: + msg = 'data[{:d}]({:s}) must be {:s}, found {:s}' + raise TypeError(msg.format(i, str(item), FASTHASHER_ITEM_ACCEPTED_FORMAT, str(type(item)))) + hasher.update(item) + return hasher.hexdigest() diff --git a/src/device/service/data_cache/database/TopologyModel.py b/src/device/service/data_cache/database/TopologyModel.py new file mode 100644 index 0000000000000000000000000000000000000000..980d234dbe6342b59c6a8a4f3ccc264c602e1bc4 --- /dev/null +++ b/src/device/service/data_cache/database/TopologyModel.py @@ -0,0 +1,25 @@ +import logging +from typing import Dict +from common.orm.fields.ForeignKeyField import ForeignKeyField +from common.orm.fields.PrimaryKeyField import PrimaryKeyField +from common.orm.fields.StringField import StringField +from common.orm.model.Model import Model +from .ContextModel import ContextModel + +LOGGER = logging.getLogger(__name__) + +class TopologyModel(Model): + pk = PrimaryKeyField() + context_fk = ForeignKeyField(ContextModel) + topology_uuid = StringField(required=True, allow_empty=False) + + def dump_id(self) -> Dict: + context_id = ContextModel(self.database, self.context_fk).dump_id() + return { + 'context_id': context_id, + 'topology_uuid': {'uuid': self.topology_uuid}, + } + + def dump(self) -> Dict: + result = {'topology_id': self.dump_id()} + return result diff --git a/src/device/service/data_cache/database/__init__.py b/src/device/service/data_cache/database/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..ff9b7da0cd0f83f8e01bbb904d8e2471503f00ca --- /dev/null +++ b/src/device/service/data_cache/database/__init__.py @@ -0,0 +1,2 @@ +# In-Memory database with a simplified representation of Context Database focused on the Device model. +# Used as an internal configuration cache, for message validation, and message formatting purposes. diff --git a/src/device/service/driver_api/DriverFactory.py b/src/device/service/driver_api/DriverFactory.py index c226e434c75286d99c2051098e0d18d9cf483caa..e80e9ea84de18c747e78232e36429e9c69e88bc2 100644 --- a/src/device/service/driver_api/DriverFactory.py +++ b/src/device/service/driver_api/DriverFactory.py @@ -1,50 +1,62 @@ -from typing import Dict, Set -from device.service.driver_api.QueryFields import QUERY_FIELDS +from typing import Any, Dict, Iterable, List, Set, Tuple from device.service.driver_api._Driver import _Driver -from device.service.driver_api.Exceptions import MultipleResultsForQueryException, UnsatisfiedQueryException, \ - UnsupportedDriverClassException, UnsupportedQueryFieldException, UnsupportedQueryFieldValueException +from device.service.driver_api.Exceptions import MultipleResultsForFilterException, UnsatisfiedFilterException, \ + UnsupportedDriverClassException, UnsupportedFilterFieldException, UnsupportedFilterFieldValueException +from device.service.driver_api.FilterFields import FILTER_FIELD_ALLOWED_VALUES, FilterFieldEnum class DriverFactory: - def __init__(self) -> None: + def __init__(self, drivers : List[Tuple[type, List[Dict[FilterFieldEnum, Any]]]]) -> None: self.__indices : Dict[str, Dict[str, Set[_Driver]]] = {} # Dict{field_name => Dict{field_value => Set{Driver}}} - def register_driver_class(self, driver_class, **query_fields): + for driver_class,filter_field_sets in drivers: + for filter_fields in filter_field_sets: + self.register_driver_class(driver_class, **filter_fields) + + def register_driver_class(self, driver_class, **filter_fields): if not issubclass(driver_class, _Driver): raise UnsupportedDriverClassException(str(driver_class)) driver_name = driver_class.__name__ - unsupported_query_fields = set(query_fields.keys()).difference(set(QUERY_FIELDS.keys())) - if len(unsupported_query_fields) > 0: - raise UnsupportedQueryFieldException(unsupported_query_fields, driver_class_name=driver_name) + unsupported_filter_fields = set(filter_fields.keys()).difference(set(FILTER_FIELD_ALLOWED_VALUES.keys())) + if len(unsupported_filter_fields) > 0: + raise UnsupportedFilterFieldException(unsupported_filter_fields, driver_class_name=driver_name) - for field_name, field_value in query_fields.items(): + for field_name, field_values in filter_fields.items(): field_indice = self.__indices.setdefault(field_name, dict()) - field_enum_values = QUERY_FIELDS.get(field_name) - if field_enum_values is not None and field_value not in field_enum_values: - raise UnsupportedQueryFieldValueException( - field_name, field_value, field_enum_values, driver_class_name=driver_name) - field_indice_drivers = field_indice.setdefault(field_name, set()) - field_indice_drivers.add(driver_class) - - def get_driver_class(self, **query_fields) -> _Driver: - unsupported_query_fields = set(query_fields.keys()).difference(set(QUERY_FIELDS.keys())) - if len(unsupported_query_fields) > 0: raise UnsupportedQueryFieldException(unsupported_query_fields) + field_enum_values = FILTER_FIELD_ALLOWED_VALUES.get(field_name) + if not isinstance(field_values, Iterable) or isinstance(field_values, str): + field_values = [field_values] + for field_value in field_values: + if field_enum_values is not None and field_value not in field_enum_values: + raise UnsupportedFilterFieldValueException( + field_name, field_value, field_enum_values, driver_class_name=driver_name) + field_indice_drivers = field_indice.setdefault(field_name, set()) + field_indice_drivers.add(driver_class) + + def get_driver_class(self, **filter_fields) -> _Driver: + unsupported_filter_fields = set(filter_fields.keys()).difference(set(FILTER_FIELD_ALLOWED_VALUES.keys())) + if len(unsupported_filter_fields) > 0: raise UnsupportedFilterFieldException(unsupported_filter_fields) candidate_driver_classes = None - - for field_name, field_value in query_fields.items(): + for field_name, field_values in filter_fields.items(): field_indice = self.__indices.get(field_name) if field_indice is None: continue - field_enum_values = QUERY_FIELDS.get(field_name) - if field_enum_values is not None and field_value not in field_enum_values: - raise UnsupportedQueryFieldValueException(field_name, field_value, field_enum_values) - field_indice_drivers = field_indice.get(field_name) - if field_indice_drivers is None: continue - - candidate_driver_classes = set().union(field_indice_drivers) if candidate_driver_classes is None else \ + field_enum_values = FILTER_FIELD_ALLOWED_VALUES.get(field_name) + if not isinstance(field_values, Iterable) or isinstance(field_values, str): + field_values = [field_values] + + field_candidate_driver_classes = set() + for field_value in field_values: + if field_enum_values is not None and field_value not in field_enum_values: + raise UnsupportedFilterFieldValueException(field_name, field_value, field_enum_values) + field_indice_drivers = field_indice.get(field_name) + if field_indice_drivers is None: continue + field_candidate_driver_classes = field_candidate_driver_classes.union(field_indice_drivers) + + candidate_driver_classes = field_indice_drivers if candidate_driver_classes is None else \ candidate_driver_classes.intersection(field_indice_drivers) - if len(candidate_driver_classes) == 0: raise UnsatisfiedQueryException(query_fields) + if len(candidate_driver_classes) == 0: raise UnsatisfiedFilterException(filter_fields) if len(candidate_driver_classes) > 1: # TODO: Consider choosing driver with more query fields being satisfied (i.e., the most restrictive one) - raise MultipleResultsForQueryException(query_fields, {d.__name__ for d in candidate_driver_classes}) + raise MultipleResultsForFilterException(filter_fields, {d.__name__ for d in candidate_driver_classes}) return candidate_driver_classes.pop() diff --git a/src/device/service/driver_api/DriverInstanceCache.py b/src/device/service/driver_api/DriverInstanceCache.py new file mode 100644 index 0000000000000000000000000000000000000000..88bfd044bce0099a5a558691dd6dbe657ea1be96 --- /dev/null +++ b/src/device/service/driver_api/DriverInstanceCache.py @@ -0,0 +1,42 @@ +import logging, threading +from typing import Any, Dict, Optional +from ._Driver import _Driver +from .DriverFactory import DriverFactory +from .Exceptions import DriverInstanceCacheTerminatedException + +LOGGER = logging.getLogger(__name__) + +class DriverInstanceCache: + def __init__(self, driver_factory : DriverFactory) -> None: + self._lock = threading.Lock() + self._terminate = threading.Event() + self._device_uuid__to__driver_instance : Dict[str, _Driver] = {} + self._driver_factory = driver_factory + + def get( + self, device_uuid : str, filter_fields : Dict[str, Any], address : Optional[str] = None, + port : Optional[int] = None, settings : Dict[str, Any] = {}) -> _Driver: + + if self._terminate.is_set(): + raise DriverInstanceCacheTerminatedException() + + with self._lock: + driver_instance = self._device_uuid__to__driver_instance.get(device_uuid) + if driver_instance is not None: return driver_instance + + driver_class = self._driver_factory.get_driver_class(**filter_fields) + driver_instance : _Driver = driver_class(address, port, **settings) + self._device_uuid__to__driver_instance[device_uuid] = driver_instance + return driver_instance + + def terminate(self): + self._terminate.set() + with self._lock: + while len(self._device_uuid__to__driver_instance) > 0: + try: + device_uuid,device_driver = self._device_uuid__to__driver_instance.popitem() + device_driver.Disconnect() + except: # pylint: disable=bare-except + msg = 'Error disconnecting Driver({:s}) from device. Will retry later...' + LOGGER.exception(msg.format(device_uuid)) + self._device_uuid__to__driver_instance[device_uuid] = device_driver diff --git a/src/device/service/driver_api/Exceptions.py b/src/device/service/driver_api/Exceptions.py index 2275bb04cbf9334a6823544bc48e8ad345d2dc20..9e70278a3363f5cb3010226467c67311ad0167b2 100644 --- a/src/device/service/driver_api/Exceptions.py +++ b/src/device/service/driver_api/Exceptions.py @@ -1,30 +1,41 @@ -class MultipleResultsForQueryException(Exception): - def __init__(self, query_fields, driver_names): - super().__init__('Multiple Drivers({}) satisfy QueryFields({})'.format(str(driver_names), str(query_fields))) +class MultipleResultsForFilterException(Exception): + def __init__(self, filter_fields, driver_names): + msg = 'Multiple Drivers({:s}) satisfy FilterFields({:s})' + super().__init__(msg.format(str(driver_names), str(filter_fields))) -class UnsatisfiedQueryException(Exception): - def __init__(self, query_fields): - super().__init__('No Driver satisfies QueryFields({})'.format(str(query_fields))) +class UnsatisfiedFilterException(Exception): + def __init__(self, filter_fields): + msg = 'No Driver satisfies FilterFields({:s})' + super().__init__(msg.format(str(filter_fields))) class UnsupportedDriverClassException(Exception): def __init__(self, driver_class_name): - super().__init__('Class({}) is not a subclass of _Driver'.format(str(driver_class_name))) + msg = 'Class({:s}) is not a subclass of _Driver' + super().__init__(msg.format(str(driver_class_name))) -class UnsupportedQueryFieldException(Exception): - def __init__(self, unsupported_query_fields, driver_class_name=None): +class UnsupportedFilterFieldException(Exception): + def __init__(self, unsupported_filter_fields, driver_class_name=None): if driver_class_name: - msg = 'QueryFields({}) specified by Driver({}) are not supported'.format( - str(unsupported_query_fields), str(driver_class_name)) + msg = 'FilterFields({:s}) specified by Driver({:s}) are not supported' + msg = msg.format(str(unsupported_filter_fields), str(driver_class_name)) else: - msg = 'QueryFields({}) specified in query are not supported'.format(str(unsupported_query_fields)) + msg = 'FilterFields({:s}) specified in Filter are not supported' + msg = msg.format(str(unsupported_filter_fields)) super().__init__(msg) -class UnsupportedQueryFieldValueException(Exception): - def __init__(self, query_field_name, query_field_value, allowed_query_field_values, driver_class_name=None): +class UnsupportedFilterFieldValueException(Exception): + def __init__(self, filter_field_name, filter_field_value, allowed_filter_field_values, driver_class_name=None): if driver_class_name: - msg = 'QueryField({}={}) specified by Driver({}) is not supported. Allowed values are {}'.format( - str(query_field_name), str(query_field_value), str(driver_class_name), str(allowed_query_field_values)) + msg = 'FilterField({:s}={:s}) specified by Driver({:s}) is not supported. Allowed values are {:s}' + msg = msg.format( + str(filter_field_name), str(filter_field_value), str(driver_class_name), + str(allowed_filter_field_values)) else: - msg = 'QueryField({}={}) specified in query is not supported. Allowed values are {}'.format( - str(query_field_name), str(query_field_value), str(allowed_query_field_values)) + msg = 'FilterField({:s}={:s}) specified in Filter is not supported. Allowed values are {:s}' + msg = msg.format(str(filter_field_name), str(filter_field_value), str(allowed_filter_field_values)) + super().__init__(msg) + +class DriverInstanceCacheTerminatedException(Exception): + def __init__(self): + msg = 'DriverInstanceCache is terminated. No new instances can be processed.' super().__init__(msg) diff --git a/src/device/service/driver_api/FilterFields.py b/src/device/service/driver_api/FilterFields.py new file mode 100644 index 0000000000000000000000000000000000000000..5c869c6553e6028659c82c1a7e9613e25bd85887 --- /dev/null +++ b/src/device/service/driver_api/FilterFields.py @@ -0,0 +1,25 @@ +from enum import Enum +from device.service.data_cache.database.DeviceModel import ORM_DeviceDriverEnum + +class DeviceTypeFilterFieldEnum(Enum): + EMULATED = 'emulated' + OPTICAL_ROADM = 'optical-roadm' + OPTICAL_TRANDPONDER = 'optical-trandponder' + PACKET_ROUTER = 'packet-router' + PACKET_SWITCH = 'packet-switch' + +class FilterFieldEnum(Enum): + DEVICE_TYPE = 'device_type' + DRIVER = 'driver' + VENDOR = 'vendor' + MODEL = 'model' + SERIAL_NUMBER = 'serial_number' + +# Map allowed filter fields to allowed values per Filter field. If no restriction (free text) None is specified +FILTER_FIELD_ALLOWED_VALUES = { + FilterFieldEnum.DEVICE_TYPE : {i.value for i in DeviceTypeFilterFieldEnum}, + FilterFieldEnum.DRIVER : {i.value for i in ORM_DeviceDriverEnum}, + FilterFieldEnum.VENDOR : None, + FilterFieldEnum.MODEL : None, + FilterFieldEnum.SERIAL_NUMBER : None, +} diff --git a/src/device/service/driver_api/QueryFields.py b/src/device/service/driver_api/QueryFields.py deleted file mode 100644 index 15b3f5b7582283083c9ea1080dc4f3d6f5390501..0000000000000000000000000000000000000000 --- a/src/device/service/driver_api/QueryFields.py +++ /dev/null @@ -1,33 +0,0 @@ -from enum import Enum - -class DeviceTypeQueryFieldEnum(Enum): - OPTICAL_ROADM = 'optical-roadm' - OPTICAL_TRANDPONDER = 'optical-trandponder' - PACKET_ROUTER = 'packet-router' - PACKET_SWITCH = 'packet-switch' - -class ProtocolQueryFieldEnum(Enum): - SOFTWARE = 'software' - GRPC = 'grpc' - RESTAPI = 'restapi' - NETCONF = 'netconf' - GNMI = 'gnmi' - RESTCONF = 'restconf' - -class DataModelQueryFieldEnum(Enum): - EMULATED = 'emu' - OPENCONFIG = 'oc' - P4 = 'p4' - TRANSPORT_API = 'tapi' - IETF_NETWORK_TOPOLOGY = 'ietf-netw-topo' - ONF_TR_352 = 'onf-tr-352' - -# Map allowed query fields to allowed values per query field. If no restriction (free text) None is specified -QUERY_FIELDS = { - 'device_type' : {i.value for i in DeviceTypeQueryFieldEnum}, - 'protocol' : {i.value for i in ProtocolQueryFieldEnum}, - 'data_model' : {i.value for i in DataModelQueryFieldEnum}, - 'vendor' : None, - 'model' : None, - 'serial_number': None, -} diff --git a/src/device/service/driver_api/_Driver.py b/src/device/service/driver_api/_Driver.py index 69ac93cb6a071a40aca25b28f930e390bfbfe6b0..34274159e98f48d38e24d71ba8858248acddec60 100644 --- a/src/device/service/driver_api/_Driver.py +++ b/src/device/service/driver_api/_Driver.py @@ -1,15 +1,15 @@ from typing import Any, Iterator, List, Tuple, Union class _Driver: - def __init__(self, address : str, port : int, **kwargs) -> None: + def __init__(self, address : str, port : int, **settings) -> None: """ Initialize Driver. Parameters: address : str The address of the device port : int The port of the device - **kwargs - Extra attributes can be configured using kwargs. + **settings + Extra settings required by the driver. """ raise NotImplementedError() @@ -29,17 +29,24 @@ class _Driver: """ raise NotImplementedError() - def GetConfig(self, resource_keys : List[str]) -> List[Union[Any, None, Exception]]: + def GetInitialConfig(self) -> List[Tuple[str, Any]]: + """ Retrieve initial configuration of entire device. + Returns: + values : List[Tuple[str, Any]] + List of tuples (resource key, resource value) for resource keys. + """ + raise NotImplementedError() + + def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: """ Retrieve running configuration of entire device, or selected resource keys. Parameters: resource_keys : List[str] List of keys pointing to the resources to be retrieved. Returns: - values : List[Union[Any, None, Exception]] - List of values for resource keys requested. Return values must be in the same order than resource - keys requested. If a resource is found, the appropriate value type must be retrieved, if a resource - is not found, None must be retrieved in the List for that resource. In case of Exception processing - a resource, the Exception must be retrieved. + values : List[Tuple[str, Union[Any, None, Exception]]] + List of tuples (resource key, resource value) for resource keys requested. If a resource is found, + the appropriate value type must be retrieved. If a resource is not found, None must be retrieved as + value for that resource. In case of Exception, the Exception must be retrieved as value. """ raise NotImplementedError() diff --git a/src/device/service/drivers/__init__.py b/src/device/service/drivers/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..3aa18038dabd96020c93be7a91caa499f8b904d6 100644 --- a/src/device/service/drivers/__init__.py +++ b/src/device/service/drivers/__init__.py @@ -0,0 +1,17 @@ +from ..driver_api.FilterFields import FilterFieldEnum, DeviceTypeFilterFieldEnum, ORM_DeviceDriverEnum +from .emulated import EmulatedDriver +#from .openconfig import OpenConfigDriver + +DRIVERS = [ + (EmulatedDriver, [ + { + FilterFieldEnum.DEVICE_TYPE: DeviceTypeFilterFieldEnum.EMULATED, + FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.OPENCONFIG, + } + ]), + #(OpenConfigDriver, [ + # { + # FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.OPENCONFIG, + # } + #]), +] diff --git a/src/device/service/drivers/emulated/EmulatedDriver.py b/src/device/service/drivers/emulated/EmulatedDriver.py index 7dc5757f7afb928bd56e64b5a87d4c414171cd6b..344d7830d2c0b4d1f477d2b3357ce50dfd9fe2f7 100644 --- a/src/device/service/drivers/emulated/EmulatedDriver.py +++ b/src/device/service/drivers/emulated/EmulatedDriver.py @@ -5,19 +5,20 @@ from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.job import Job from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.schedulers.background import BackgroundScheduler -from common.Checkers import chk_float, chk_length, chk_string, chk_type +from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type from device.service.driver_api._Driver import _Driver from .AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value LOGGER = logging.getLogger(__name__) -def sample(resource_key : str, out_samples : queue.Queue): +def do_sampling(resource_key : str, out_samples : queue.Queue): out_samples.put_nowait((datetime.timestamp(datetime.utcnow()), resource_key, random.random())) class EmulatedDriver(_Driver): - def __init__(self, address : str, port : int, **kwargs) -> None: + def __init__(self, address : str, port : int, **settings) -> None: # pylint: disable=super-init-not-called self.__lock = threading.Lock() - self.__root = TreeNode('.') + self.__initial = TreeNode('.') + self.__running = TreeNode('.') self.__terminate = threading.Event() self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events self.__scheduler.configure( @@ -39,10 +40,14 @@ class EmulatedDriver(_Driver): self.__scheduler.shutdown() return True - def GetConfig(self, resource_keys : List[str] = []) -> List[Union[Any, None, Exception]]: + def GetInitialConfig(self) -> List[Tuple[str, Any]]: + with self.__lock: + return dump_subtree(self.__initial) + + def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: chk_type('resources', resource_keys, list) with self.__lock: - if len(resource_keys) == 0: return dump_subtree(self.__root) + if len(resource_keys) == 0: return dump_subtree(self.__running) results = [] resolver = anytree.Resolver(pathattr='name') for i,resource_key in enumerate(resource_keys): @@ -50,14 +55,14 @@ class EmulatedDriver(_Driver): try: chk_string(str_resource_name, resource_key, allow_empty=False) resource_path = resource_key.split('/') - except Exception as e: - LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key))) - results.append(e) # if validation fails, store the exception + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key))) + results.append((resource_key, e)) # if validation fails, store the exception continue - resource_node = get_subnode(resolver, self.__root, resource_path, default=None) + resource_node = get_subnode(resolver, self.__running, resource_path, default=None) # if not found, resource_node is None - results.append(None if resource_node is None else dump_subtree(resource_node)) + results.append((resource_key, None if resource_node is None else dump_subtree(resource_node))) return results def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: @@ -70,15 +75,15 @@ class EmulatedDriver(_Driver): str_resource_name = 'resources[#{:d}]'.format(i) try: chk_type(str_resource_name, resource, (list, tuple)) - chk_length(str_resource_name, resource, allowed_lengths=2) + chk_length(str_resource_name, resource, min_length=2, max_length=2) resource_key,resource_value = resource resource_path = resource_key.split('/') - except Exception as e: - LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key))) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key))) results.append(e) # if validation fails, store the exception continue - set_subnode_value(resolver, self.__root, resource_path, resource_value) + set_subnode_value(resolver, self.__running, resource_path, resource_value) results.append(True) return results @@ -93,12 +98,12 @@ class EmulatedDriver(_Driver): try: chk_string(str_resource_name, resource_key, allow_empty=False) resource_path = resource_key.split('/') - except Exception as e: - LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key))) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key))) results.append(e) # if validation fails, store the exception continue - resource_node = get_subnode(resolver, self.__root, resource_path, default=None) + resource_node = get_subnode(resolver, self.__running, resource_path, default=None) # if not found, resource_node is None if resource_node is None: results.append(False) @@ -111,24 +116,24 @@ class EmulatedDriver(_Driver): results.append(True) return results - def SubscribeState(self, resources : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: - chk_type('resources', resources, list) - if len(resources) == 0: return [] + def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: + chk_type('subscriptions', subscriptions, list) + if len(subscriptions) == 0: return [] results = [] resolver = anytree.Resolver(pathattr='name') with self.__lock: - for i,resource in enumerate(resources): - str_resource_name = 'resources[#{:d}]'.format(i) + for i,subscription in enumerate(subscriptions): + str_subscription_name = 'subscriptions[#{:d}]'.format(i) try: - chk_type(str_resource_name, resource, (list, tuple)) - chk_length(str_resource_name, resource, allowed_lengths=3) - resource_key,sampling_duration,sampling_interval = resource - chk_string(str_resource_name + '.resource_key', resource_key, allow_empty=False) + chk_type(str_subscription_name, subscription, (list, tuple)) + chk_length(str_subscription_name, subscription, min_length=3, max_length=3) + resource_key,sampling_duration,sampling_interval = subscription + chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False) resource_path = resource_key.split('/') - chk_float(str_resource_name + '.sampling_duration', sampling_duration, min_value=0) - chk_float(str_resource_name + '.sampling_interval', sampling_interval, min_value=0) - except Exception as e: - LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key))) + chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0) + chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key))) results.append(e) # if validation fails, store the exception continue @@ -139,38 +144,38 @@ class EmulatedDriver(_Driver): job_id = 'k={:s}/d={:f}/i={:f}'.format(resource_key, sampling_duration, sampling_interval) job = self.__scheduler.add_job( - sample, args=(resource_key, self.__out_samples), kwargs={}, + do_sampling, args=(resource_key, self.__out_samples), kwargs={}, id=job_id, trigger='interval', seconds=sampling_interval, start_date=start_date, end_date=end_date, timezone=pytz.utc) subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)] - set_subnode_value(resolver, self.__root, subscription_path, job) + set_subnode_value(resolver, self.__running, subscription_path, job) results.append(True) return results - def UnsubscribeState(self, resources : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: - chk_type('resources', resources, list) - if len(resources) == 0: return [] + def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: + chk_type('subscriptions', subscriptions, list) + if len(subscriptions) == 0: return [] results = [] resolver = anytree.Resolver(pathattr='name') with self.__lock: - for i,resource in enumerate(resources): - str_resource_name = 'resources[#{:d}]'.format(i) + for i,resource in enumerate(subscriptions): + str_subscription_name = 'resources[#{:d}]'.format(i) try: - chk_type(str_resource_name, resource, (list, tuple)) - chk_length(str_resource_name, resource, allowed_lengths=3) + chk_type(str_subscription_name, resource, (list, tuple)) + chk_length(str_subscription_name, resource, min_length=3, max_length=3) resource_key,sampling_duration,sampling_interval = resource - chk_string(str_resource_name + '.resource_key', resource_key, allow_empty=False) + chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False) resource_path = resource_key.split('/') - chk_float(str_resource_name + '.sampling_duration', sampling_duration, min_value=0) - chk_float(str_resource_name + '.sampling_interval', sampling_interval, min_value=0) - except Exception as e: - LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key))) + chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0) + chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key))) results.append(e) # if validation fails, store the exception continue subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)] - subscription_node = get_subnode(resolver, self.__root, subscription_path) + subscription_node = get_subnode(resolver, self.__running, subscription_path) # if not found, resource_node is None if subscription_node is None: @@ -179,7 +184,7 @@ class EmulatedDriver(_Driver): job : Job = getattr(subscription_node, 'value', None) if job is None or not isinstance(job, Job): - raise Exception('Malformed subscription node or wrong resource key: {}'.format(str(resource))) + raise Exception('Malformed subscription node or wrong resource key: {:s}'.format(str(resource))) job.remove() parent = subscription_node.parent diff --git a/src/device/tests/test_unitary_driverapi.py b/src/device/tests/test_unitary_driverapi.py index ac7231cf76d4bcba0ea37da0cab781e21bc1c560..aa348c25ad794af190cf9ccf3e2a195fce946e1d 100644 --- a/src/device/tests/test_unitary_driverapi.py +++ b/src/device/tests/test_unitary_driverapi.py @@ -37,44 +37,47 @@ def device_driverapi_emulated(): yield _driver _driver.Disconnect() -def test_device_driverapi_emulated_setconfig(device_driverapi_emulated : EmulatedDriver): - # should work +def test_device_driverapi_emulated_setconfig( + device_driverapi_emulated : EmulatedDriver): # pylint: disable=redefined-outer-name + results = device_driverapi_emulated.SetConfig(DEVICE_CONFIG_IF1) - LOGGER.info('results:\n{}'.format('\n'.join(map(str, results)))) + LOGGER.info('results:\n{:s}'.format('\n'.join(map(str, results)))) assert len(results) == len(DEVICE_CONFIG_IF1) for result in results: assert isinstance(result, bool) and result results = device_driverapi_emulated.SetConfig(DEVICE_CONFIG_IF2) - LOGGER.info('results:\n{}'.format('\n'.join(map(str, results)))) + LOGGER.info('results:\n{:s}'.format('\n'.join(map(str, results)))) assert len(results) == len(DEVICE_CONFIG_IF2) for result in results: assert isinstance(result, bool) and result -def test_device_driverapi_emulated_getconfig(device_driverapi_emulated : EmulatedDriver): +def test_device_driverapi_emulated_getconfig( + device_driverapi_emulated : EmulatedDriver): # pylint: disable=redefined-outer-name + stored_config = device_driverapi_emulated.GetConfig() - LOGGER.info('stored_config:\n{}'.format('\n'.join(map(str, stored_config)))) + LOGGER.info('stored_config:\n{:s}'.format('\n'.join(map(str, stored_config)))) assert len(stored_config) == len(DEVICE_CONFIG_IF1) + len(DEVICE_CONFIG_IF2) for config_row in stored_config: assert (config_row in DEVICE_CONFIG_IF1) or (config_row in DEVICE_CONFIG_IF2) for config_row in DEVICE_CONFIG_IF1: assert config_row in stored_config for config_row in DEVICE_CONFIG_IF2: assert config_row in stored_config - # should work stored_config = device_driverapi_emulated.GetConfig([PATH_IF.format('IF2')]) - LOGGER.info('stored_config:\n{}'.format('\n'.join(map(str, stored_config)))) + LOGGER.info('stored_config:\n{:s}'.format('\n'.join(map(str, stored_config)))) assert len(stored_config) == 1 stored_config = stored_config[0] - LOGGER.info('stored_config[0]:\n{}'.format('\n'.join(map(str, stored_config)))) + LOGGER.info('stored_config[0]:\n{:s}'.format('\n'.join(map(str, stored_config)))) assert len(stored_config) == len(DEVICE_CONFIG_IF2) for config_row in stored_config: assert config_row in DEVICE_CONFIG_IF2 for config_row in DEVICE_CONFIG_IF2: assert config_row in stored_config -def test_device_driverapi_emulated_deleteconfig(device_driverapi_emulated : EmulatedDriver): - # should work +def test_device_driverapi_emulated_deleteconfig( + device_driverapi_emulated : EmulatedDriver): # pylint: disable=redefined-outer-name + results = device_driverapi_emulated.DeleteConfig([PATH_ADDRIPV4.format('IF2', 0, '10.2.0.1')]) - LOGGER.info('results:\n{}'.format('\n'.join(map(str, results)))) + LOGGER.info('results:\n{:s}'.format('\n'.join(map(str, results)))) assert (len(results) == 1) and isinstance(results[0], bool) and results[0] stored_config = device_driverapi_emulated.GetConfig() - LOGGER.info('stored_config:\n{}'.format('\n'.join(map(str, stored_config)))) + LOGGER.info('stored_config:\n{:s}'.format('\n'.join(map(str, stored_config)))) device_config_if2 = list(filter(lambda row: '10.2.0.1' not in row[0], copy.deepcopy(DEVICE_CONFIG_IF2))) assert len(stored_config) == len(DEVICE_CONFIG_IF1) + len(device_config_if2) @@ -82,8 +85,9 @@ def test_device_driverapi_emulated_deleteconfig(device_driverapi_emulated : Emul for config_row in DEVICE_CONFIG_IF1: assert config_row in stored_config for config_row in device_config_if2: assert config_row in stored_config -def test_device_driverapi_emulated_subscriptions(device_driverapi_emulated : EmulatedDriver): - # should work +def test_device_driverapi_emulated_subscriptions( + device_driverapi_emulated : EmulatedDriver): # pylint: disable=redefined-outer-name + duration = 10.0 interval = 1.5 results = device_driverapi_emulated.SubscribeState([ @@ -92,21 +96,21 @@ def test_device_driverapi_emulated_subscriptions(device_driverapi_emulated : Emu (DEVICE_STATE_IF2_TX_PKTS, duration, interval), (DEVICE_STATE_IF2_RX_PKTS, duration, interval), ]) - LOGGER.info('results:\n{}'.format('\n'.join(map(str, results)))) + LOGGER.info('results:\n{:s}'.format('\n'.join(map(str, results)))) assert len(results) == 4 for result in results: assert isinstance(result, bool) and result stored_config = device_driverapi_emulated.GetConfig() - LOGGER.info('stored_config:\n{}'.format('\n'.join(map(str, stored_config)))) + LOGGER.info('stored_config:\n{:s}'.format('\n'.join(map(str, stored_config)))) time.sleep(duration + 1.0) # let time to generate samples, plus 1 second extra time samples = [] for sample in device_driverapi_emulated.GetState(blocking=False): - LOGGER.info('sample: {}'.format(str(sample))) + LOGGER.info('sample: {:s}'.format(str(sample))) timestamp,resource_key,resource_value = sample samples.append((timestamp, resource_key, resource_value)) - LOGGER.info('samples:\n{}'.format('\n'.join(map(str, samples)))) + LOGGER.info('samples:\n{:s}'.format('\n'.join(map(str, samples)))) assert len(samples) == 4 * (math.floor(duration/interval) + 1) results = device_driverapi_emulated.UnsubscribeState([ @@ -115,12 +119,12 @@ def test_device_driverapi_emulated_subscriptions(device_driverapi_emulated : Emu (DEVICE_STATE_IF2_TX_PKTS, 10.0, 1.5), (DEVICE_STATE_IF2_RX_PKTS, 10.0, 1.5), ]) - LOGGER.info('results:\n{}'.format('\n'.join(map(str, results)))) + LOGGER.info('results:\n{:s}'.format('\n'.join(map(str, results)))) assert len(results) == 4 for result in results: assert isinstance(result, bool) and result stored_config = device_driverapi_emulated.GetConfig() - LOGGER.info('stored_config:\n{}'.format('\n'.join(map(str, stored_config)))) + LOGGER.info('stored_config:\n{:s}'.format('\n'.join(map(str, stored_config)))) device_config_if2 = list(filter(lambda row: '10.2.0.1' not in row[0], copy.deepcopy(DEVICE_CONFIG_IF2))) assert len(stored_config) == len(DEVICE_CONFIG_IF1) + len(device_config_if2) for config_row in stored_config: assert (config_row in DEVICE_CONFIG_IF1) or (config_row in device_config_if2)