diff --git a/manifests/.gitignore b/manifests/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..7ca2c51d3ca0ade4160bd94025769882bce3fb08 --- /dev/null +++ b/manifests/.gitignore @@ -0,0 +1,4 @@ +# Internal manifest used for local testings. + +# CTTC section: +cttc-ols.yaml diff --git a/run_tests_locally.sh b/run_tests_locally.sh index c3db583af420a63072a1122ee5fb8d2a3565fb0a..b483de8062cb884cbdda5530176a97c5dd3f8bfa 100755 --- a/run_tests_locally.sh +++ b/run_tests_locally.sh @@ -37,7 +37,6 @@ coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ context/tests/test_unitary.py coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ - device/tests/test_unitary_driverapi.py \ device/tests/test_unitary.py coverage run --rcfile=$RCFILE --append -m pytest -s --log-level=INFO --verbose \ diff --git a/src/device/client/DeviceClient.py b/src/device/client/DeviceClient.py index 3841bf8cb4892dcc191d536061aea2bb1bd9d06d..7c5fa0ca2561a429d240b36f38357a874b882aa8 100644 --- a/src/device/client/DeviceClient.py +++ b/src/device/client/DeviceClient.py @@ -1,7 +1,7 @@ import grpc, logging from common.tools.client.RetryDecorator import retry, delay_exponential from device.proto.context_pb2 import Device, DeviceConfig, DeviceId, Empty -#from device.proto.device_pb2 import MonitoringSettings +from device.proto.device_pb2 import MonitoringSettings from device.proto.device_pb2_grpc import DeviceServiceStub LOGGER = logging.getLogger(__name__) @@ -54,9 +54,9 @@ class DeviceClient: LOGGER.debug('GetInitialConfig result: {:s}'.format(str(response))) return response - #@retry(exceptions=set(), max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') - #def MonitorDeviceKpi(self, request : MonitoringSettings) -> Empty: - # LOGGER.debug('MonitorDeviceKpi request: {:s}'.format(str(request))) - # response = self.stub.MonitorDeviceKpi(request) - # LOGGER.debug('MonitorDeviceKpi result: {:s}'.format(str(response))) - # return response + @retry(exceptions=set(), max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') + def MonitorDeviceKpi(self, request : MonitoringSettings) -> Empty: + LOGGER.debug('MonitorDeviceKpi request: {:s}'.format(str(request))) + response = self.stub.MonitorDeviceKpi(request) + LOGGER.debug('MonitorDeviceKpi result: {:s}'.format(str(response))) + return response diff --git a/src/device/service/DeviceService.py b/src/device/service/DeviceService.py index ae0d5c8396157d5398751587f5e7d808b0d8f484..11cb8d3e39236002ee7ca6471c69112bbcfa059a 100644 --- a/src/device/service/DeviceService.py +++ b/src/device/service/DeviceService.py @@ -3,26 +3,29 @@ from concurrent import futures from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH from grpc_health.v1.health_pb2 import HealthCheckResponse from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server +from common.orm.backend.BackendEnum import BackendEnum +from common.orm.Database import Database +from common.orm.Factory import get_database_backend from context.client.ContextClient import ContextClient from device.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD from device.proto.device_pb2_grpc import add_DeviceServiceServicer_to_server -from .DeviceServiceServicerImpl import DeviceServiceServicerImpl -#from .MonitoringLoops import MonitoringLoops +from monitoring.client.monitoring_client import MonitoringClient from .driver_api.DriverInstanceCache import DriverInstanceCache +from .DeviceServiceServicerImpl import DeviceServiceServicerImpl +from .MonitoringLoops import MonitoringLoops BIND_ADDRESS = '0.0.0.0' LOGGER = logging.getLogger(__name__) class DeviceService: def __init__( - self, context_client : ContextClient, driver_instance_cache : DriverInstanceCache, - #monitoring_loops : MonitoringLoops, - address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, - grace_period=GRPC_GRACE_PERIOD): + self, context_client : ContextClient, monitoring_client : MonitoringClient, + driver_instance_cache : DriverInstanceCache, + address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD): self.context_client = context_client + self.monitoring_client = monitoring_client self.driver_instance_cache = driver_instance_cache - #self.monitoring_loops = monitoring_loops self.address = address self.port = port self.endpoint = None @@ -33,18 +36,21 @@ class DeviceService: self.pool = None self.server = None + self.database = Database(get_database_backend(backend=BackendEnum.INMEMORY)) + self.monitoring_loops = MonitoringLoops(monitoring_client, self.database) + def start(self): self.endpoint = '{:s}:{:s}'.format(str(self.address), str(self.port)) LOGGER.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format( str(self.endpoint), str(self.max_workers))) + self.monitoring_loops.start() + self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers) self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,)) self.device_servicer = DeviceServiceServicerImpl( - self.context_client, self.driver_instance_cache, - #self.monitoring_loops - ) + self.context_client, self.database, self.driver_instance_cache, self.monitoring_loops) add_DeviceServiceServicer_to_server(self.device_servicer, self.server) self.health_servicer = HealthServicer( @@ -63,4 +69,5 @@ class DeviceService: LOGGER.debug('Stopping service (grace period {:s} seconds)...'.format(str(self.grace_period))) self.health_servicer.enter_graceful_shutdown() self.server.stop(self.grace_period) + self.monitoring_loops.stop() LOGGER.debug('Service stopped') diff --git a/src/device/service/DeviceServiceServicerImpl.py b/src/device/service/DeviceServiceServicerImpl.py index 6c559fda37b7e4f8798be3747261e559a3eb0953..e452d8fdd0cd8325d7167f90b072409cae2635bf 100644 --- a/src/device/service/DeviceServiceServicerImpl.py +++ b/src/device/service/DeviceServiceServicerImpl.py @@ -2,32 +2,30 @@ import grpc, json, logging from typing import Any, List, Tuple from google.protobuf.json_format import MessageToDict from common.orm.Database import Database -from common.orm.Factory import get_database_backend from common.orm.HighLevel import get_object, update_or_create_object -from common.orm.backend.BackendEnum import BackendEnum from common.orm.backend.Tools import key_to_str from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, OperationFailedException from context.client.ContextClient import ContextClient +from context.proto.kpi_sample_types_pb2 import KpiSampleType from device.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig, DeviceId, Empty -#from device.proto.device_pb2 import MonitoringSettings +from device.proto.device_pb2 import MonitoringSettings from device.proto.device_pb2_grpc import DeviceServiceServicer -#from .MonitoringLoops import MonitoringLoops +from device.service.database.RelationModels import EndPointMonitorKpiModel +from .MonitoringLoops import MonitoringLoops from .database.ConfigModel import ( ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw, update_config) from .database.DatabaseTools import ( delete_device_from_context, get_device_driver_filter_fields, sync_device_from_context, sync_device_to_context, update_device_in_local_database) from .database.DeviceModel import DeviceModel, DriverModel -from .database.EndPointModel import EndPointModel -#from .database.KpiModel import KpiModel -#from .database.KpiSampleType import grpc_to_enum__kpi_sample_type -from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES +from .database.EndPointModel import EndPointModel, EndPointMonitorModel +from .database.KpiModel import KpiModel +from .database.KpiSampleType import ORM_KpiSampleType, grpc_to_enum__kpi_sample_type +from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS #, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES from .driver_api.DriverInstanceCache import DriverInstanceCache from .driver_api.Tools import ( - check_delete_errors, check_set_errors, - #check_subscribe_errors, check_unsubscribe_errors -) + check_delete_errors, check_set_errors, check_subscribe_errors, check_unsubscribe_errors) LOGGER = logging.getLogger(__name__) @@ -37,15 +35,14 @@ METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) class DeviceServiceServicerImpl(DeviceServiceServicer): def __init__( - self, context_client : ContextClient, driver_instance_cache : DriverInstanceCache, - #monitoring_loops : MonitoringLoops - ): + self, context_client : ContextClient, database : Database, driver_instance_cache : DriverInstanceCache, + monitoring_loops : MonitoringLoops): LOGGER.debug('Creating Servicer...') self.context_client = context_client - self.database = Database(get_database_backend(backend=BackendEnum.INMEMORY)) + self.database = database self.driver_instance_cache = driver_instance_cache - #self.monitoring_loops = monitoring_loops + self.monitoring_loops = monitoring_loops LOGGER.debug('Servicer Created') @safe_and_metered_rpc_method(METRICS, LOGGER) @@ -97,38 +94,57 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): driver_filter_fields = get_device_driver_filter_fields(db_device) - address = connection_config_rules.pop('address', None) - port = connection_config_rules.pop('port', None) + #LOGGER.info('[AddDevice] connection_config_rules = {:s}'.format(str(connection_config_rules))) + address = connection_config_rules.pop('address', None) + port = connection_config_rules.pop('port', None) + settings = connection_config_rules.pop('settings', '{}') + try: + settings = json.loads(settings) + except ValueError as e: + raise InvalidArgumentException( + 'device.device_config.config_rules[settings]', settings, + extra_details='_connect/settings Config Rules provided cannot be decoded as JSON dictionary.') from e driver : _Driver = self.driver_instance_cache.get( - device_uuid, filter_fields=driver_filter_fields, address=address, port=port, - settings=connection_config_rules) + device_uuid, filter_fields=driver_filter_fields, address=address, port=port, settings=settings) driver.Connect() endpoints = driver.GetConfig([RESOURCE_ENDPOINTS]) - for _, resource_value in endpoints: - endpoint_uuid = resource_value.get('name') + #LOGGER.info('[AddDevice] endpoints = {:s}'.format(str(endpoints))) + for resource_key, resource_value in endpoints: + endpoint_uuid = resource_value.get('uuid') endpoint_type = resource_value.get('type') str_endpoint_key = key_to_str([device_uuid, endpoint_uuid]) - update_or_create_object( + db_endpoint, _ = update_or_create_object( self.database, EndPointModel, str_endpoint_key, { 'device_fk' : db_device, 'endpoint_uuid': endpoint_uuid, 'endpoint_type': endpoint_type, + 'resource_key' : resource_key, }) - - running_config_rules = driver.GetConfig([RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES]) + sample_types = resource_value.get('sample_types', {}) + for sample_type, monitor_resource_key in sample_types.items(): + str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)]) + update_or_create_object(self.database, EndPointMonitorModel, str_endpoint_monitor_key, { + 'endpoint_fk' : db_endpoint, + 'resource_key' : monitor_resource_key, + 'kpi_sample_type': grpc_to_enum__kpi_sample_type(sample_type), + }) + + running_config_rules = driver.GetConfig() running_config_rules = [ (ORM_ConfigActionEnum.SET, config_rule[0], json.dumps(config_rule[1], sort_keys=True)) for config_rule in running_config_rules ] #for running_config_rule in running_config_rules: # LOGGER.info('[AddDevice] running_config_rule: {:s}'.format(str(running_config_rule))) - update_config(self.database, device_uuid, 'running', running_config_rules) initial_config_rules = driver.GetInitialConfig() update_config(self.database, device_uuid, 'initial', initial_config_rules) + #LOGGER.info('[AddDevice] db_device = {:s}'.format(str(db_device.dump( + # include_config_rules=True, include_drivers=True, include_endpoints=True)))) + sync_device_to_context(db_device, self.context_client) return DeviceId(**db_device.dump_id()) @@ -141,15 +157,15 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): context_config_rules = get_config_rules(self.database, device_uuid, 'running') context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules} - LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules))) + #LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules))) db_device,_ = update_device_in_local_database(self.database, request) request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules) - LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules))) + #LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules))) - resources_to_set : List[Tuple[str, Any]] = [] # key, value - resources_to_delete : List[Tuple[str, Any]] = [] # key, value + resources_to_set : List[Tuple[str, Any]] = [] # key, value + resources_to_delete : List[Tuple[str, Any]] = [] # key, value for config_rule in request_config_rules: action, key, value = config_rule @@ -160,8 +176,8 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): if key in context_config_rules: resources_to_delete.append((key, value)) - LOGGER.info('[ConfigureDevice] resources_to_set = {:s}'.format(str(resources_to_set))) - LOGGER.info('[ConfigureDevice] resources_to_delete = {:s}'.format(str(resources_to_delete))) + #LOGGER.info('[ConfigureDevice] resources_to_set = {:s}'.format(str(resources_to_set))) + #LOGGER.info('[ConfigureDevice] resources_to_delete = {:s}'.format(str(resources_to_delete))) # TODO: use of datastores (might be virtual ones) to enable rollbacks @@ -182,6 +198,15 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): if len(errors) > 0: raise OperationFailedException('ConfigureDevice', extra_details=errors) + running_config_rules = driver.GetConfig() + running_config_rules = [ + (ORM_ConfigActionEnum.SET, config_rule[0], json.dumps(config_rule[1], sort_keys=True)) + for config_rule in running_config_rules + ] + #for running_config_rule in running_config_rules: + # LOGGER.info('[AddDevice] running_config_rule: {:s}'.format(str(running_config_rule))) + update_config(self.database, device_uuid, 'running', running_config_rules) + sync_device_to_context(db_device, self.context_client) return DeviceId(**db_device.dump_id()) @@ -196,8 +221,14 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): self.driver_instance_cache.delete(device_uuid) delete_device_from_context(db_device, self.context_client) + for db_kpi_pk,_ in db_device.references(KpiModel): + KpiModel(self.database, db_kpi_pk).delete() + for db_endpoint_pk,_ in db_device.references(EndPointModel): - EndPointModel(self.database, db_endpoint_pk).delete() + db_endpoint = EndPointModel(self.database, db_endpoint_pk) + for db_endpoint_monitor_pk,_ in db_endpoint.references(EndPointMonitorModel): + EndPointMonitorModel(self.database, db_endpoint_monitor_pk).delete() + db_endpoint.delete() for db_driver_pk,_ in db_device.references(DriverModel): DriverModel(self.database, db_driver_pk).delete() @@ -225,66 +256,136 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): config_rules = {} if db_device is None else db_device.dump_initial_config() return DeviceConfig(config_rules=config_rules) -# # Code under implemention and testing -# @safe_and_metered_rpc_method(METRICS, LOGGER) -# def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty: -# kpi_uuid = request.kpi_id.kpi_id.uuid -# -# device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid -# db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) -# -# endpoint_id = request.kpi_descriptor.endpoint_id -# endpoint_uuid = endpoint_id.endpoint_uuid.uuid -# endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid -# if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid -# str_endpoint_key = key_to_str([device_uuid, endpoint_uuid]) -# endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid -# endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid -# if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: -# str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) -# str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') -# db_endpoint : EndPointModel = get_object( -# self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False) -# -# #db_kpi_prev = get_object(self.database, KpiModel, kpi_uuid, raise_if_not_found=False) -# result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, { -# 'kpi_uuid' : request.kpi_id.kpi_id.uuid, -# 'kpi_description' : request.kpi_descriptor.kpi_description, -# 'kpi_sample_type' : grpc_to_enum__kpi_sample_type(request.kpi_descriptor.kpi_sample_type), -# 'device_fk' : db_device, -# 'endpoint_fk' : db_endpoint, -# 'sampling_duration': request.sampling_duration_s, -# 'sampling_interval': request.sampling_interval_s, -# }) -# db_kpi, updated = result -# -# driver : _Driver = self.driver_instance_cache.get(device_uuid) -# if driver is None: -# msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid)) -# raise OperationFailedException('ConfigureDevice', extra_details=msg) -# -# sampling_resource = driver.GetResource(db_endpoint.endpoint_uuid) -# -# #resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval -# #resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval -# #LOGGER.info('[ConfigureDevice] resources_to_subscribe = {:s}'.format(str(resources_to_subscribe))) -# #LOGGER.info('[ConfigureDevice] resources_to_unsubscribe = {:s}'.format(str(resources_to_unsubscribe))) -# # TODO: Implement configuration of subscriptions -# -# #if len(errors) == 0: -# # results_subscribestate = driver.SubscribeState(resources_to_subscribe) -# # errors.extend(check_subscribe_errors(resources_to_delete, results_subscribestate)) -# -# #if len(errors) == 0: -# # results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe) -# # errors.extend(check_unsubscribe_errors(resources_to_delete, results_unsubscribestate)) -# -# results = driver.SubscribeState([ -# (sampling_resource, db_kpi.sampling_duration, db_kpi.sampling_interval), -# ]) -# assert len(results) == 4 -# for result in results: assert isinstance(result, bool) and result -# -# self.monitoring_loops.add(device_uuid, driver) -# -# return Empty() + @safe_and_metered_rpc_method(METRICS, LOGGER) + def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty: + kpi_uuid = request.kpi_id.kpi_id.uuid + + subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0) + if subscribe: + device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid + + db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) + if db_device is None: + msg = 'Device({:s}) has not been added to this Device instance.'.format(str(device_uuid)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + + endpoint_id = request.kpi_descriptor.endpoint_id + endpoint_uuid = endpoint_id.endpoint_uuid.uuid + str_endpoint_key = key_to_str([device_uuid, endpoint_uuid]) + endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid + endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid + if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: + str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) + str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') + db_endpoint : EndPointModel = get_object( + self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False) + if db_endpoint is None: + msg = 'Device({:s})/EndPoint({:s}) not found. EndPointKey({:s})'.format( + str(device_uuid), str(endpoint_uuid), str(str_endpoint_key)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + + driver : _Driver = self.driver_instance_cache.get(device_uuid) + if driver is None: + msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + + sample_type = request.kpi_descriptor.kpi_sample_type + + attributes = { + 'kpi_uuid' : request.kpi_id.kpi_id.uuid, + 'kpi_description' : request.kpi_descriptor.kpi_description, + 'kpi_sample_type' : grpc_to_enum__kpi_sample_type(sample_type), + 'device_fk' : db_device, + 'endpoint_fk' : db_endpoint, + 'sampling_duration': request.sampling_duration_s, + 'sampling_interval': request.sampling_interval_s, + } + result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, attributes) + db_kpi, updated = result + + str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)]) + db_endpoint_monitor : EndPointMonitorModel = get_object( + self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False) + if db_endpoint_monitor is None: + msg = 'SampleType({:s}/{:s}) not supported for EndPoint({:s}).'.format( + str(sample_type), str(KpiSampleType.Name(sample_type).upper().replace('KPISAMPLETYPE_', '')), + str(endpoint_uuid)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + + str_endpoint_monitor_kpi_key = key_to_str([device_uuid, db_endpoint_monitor.resource_key], separator=':') + attributes = { + 'endpoint_monitor_fk': db_endpoint_monitor, + 'kpi_fk' : db_kpi, + } + result : Tuple[EndPointMonitorKpiModel, bool] = update_or_create_object( + self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, attributes) + db_endpoint_monitor_kpi, updated = result + + resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval + resources_to_subscribe.append( + (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval)) + results_subscribestate = driver.SubscribeState(resources_to_subscribe) + errors = check_subscribe_errors(resources_to_subscribe, results_subscribestate) + if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors) + + self.monitoring_loops.add(device_uuid, driver) + + else: + db_kpi : KpiModel = get_object( + self.database, KpiModel, kpi_uuid, raise_if_not_found=False) + if db_kpi is None: + msg = 'Kpi({:s}) not found'.format(str(kpi_uuid)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + + db_device : DeviceModel = get_object( + self.database, DeviceModel, db_kpi.device_fk, raise_if_not_found=False) + if db_device is None: + msg = 'Device({:s}) not found'.format(str(db_kpi.device_fk)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + device_uuid = db_device.device_uuid + + db_endpoint : EndPointModel = get_object( + self.database, EndPointModel, db_kpi.endpoint_fk, raise_if_not_found=False) + if db_endpoint is None: + msg = 'EndPoint({:s}) not found'.format(str(db_kpi.endpoint_fk)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + endpoint_uuid = db_endpoint.endpoint_uuid + str_endpoint_key = db_endpoint.pk + + kpi_sample_type : ORM_KpiSampleType = db_kpi.kpi_sample_type + sample_type = kpi_sample_type.value + str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)]) + db_endpoint_monitor : EndPointMonitorModel = get_object( + self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False) + if db_endpoint_monitor is None: + msg = 'EndPointMonitor({:s}) not found.'.format(str(str_endpoint_monitor_key)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + + str_endpoint_monitor_kpi_key = key_to_str([device_uuid, db_endpoint_monitor.resource_key], separator=':') + db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object( + self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False) + if db_endpoint_monitor_kpi is None: + msg = 'EndPointMonitorKpi({:s}) not found.'.format(str(str_endpoint_monitor_kpi_key)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + + resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval + resources_to_unsubscribe.append( + (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval)) + + driver : _Driver = self.driver_instance_cache.get(device_uuid) + if driver is None: + msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid)) + raise OperationFailedException('MonitorDeviceKpi', extra_details=msg) + + results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe) + errors = check_unsubscribe_errors(resources_to_unsubscribe, results_unsubscribestate) + if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors) + + db_endpoint_monitor_kpi.delete() + db_kpi.delete() + + # There is one monitoring loop per device; keep them active since they are re-used by different monitoring + # requests. + #self.monitoring_loops.remove(device_uuid) + + return Empty() diff --git a/src/device/service/MonitoringLoops.py b/src/device/service/MonitoringLoops.py index 658e1de0a99db161fabb88733182d3386b165cbd..2e96e8df1dfd7a050aae004b5d5a41bae469e438 100644 --- a/src/device/service/MonitoringLoops.py +++ b/src/device/service/MonitoringLoops.py @@ -1,83 +1,134 @@ -#import logging, queue, threading -#from typing import Dict -#from monitoring.client.monitoring_client import MonitoringClient -#from monitoring.proto.monitoring_pb2 import Kpi -#from .driver_api._Driver import _Driver -# -#LOGGER = logging.getLogger(__name__) -#QUEUE_GET_WAIT_TIMEOUT = 0.5 -# -#class MonitoringLoop: -# def __init__(self, driver : _Driver, samples_queue : queue.Queue) -> None: -# self._driver = driver -# self._samples_queue = samples_queue -# self._running = threading.Event() -# self._terminate = threading.Event() -# self._samples_stream = self._driver.GetState(blocking=True) -# self._collector_thread = threading.Thread(target=self._collect, daemon=False) -# -# def _collect(self) -> None: -# for sample in self._samples_stream: -# if self._terminate.is_set(): break -# LOGGER.info('[MonitoringLoop:_collect] sample={:s}'.format(str(sample))) -# # TODO: add timestamp (if not present) -# self._samples_queue.put_nowait(sample) -# -# def start(self): -# self._collector_thread.start() -# self._running.set() -# -# @property -# def is_running(self): return self._running.is_set() -# -# def stop(self): -# self._terminate.set() -# self._samples_stream.cancel() -# self._collector_thread.join() -# -#class MonitoringLoops: -# def __init__(self, monitoring_client : MonitoringClient) -> None: -# self._monitoring_client = monitoring_client -# self._samples_queue = queue.Queue() -# self._running = threading.Event() -# self._terminate = threading.Event() -# self._lock = threading.Lock() -# self._device_uuid__to__monitoring_loop : Dict[str, MonitoringLoop] = {} -# self._exporter_thread = threading.Thread(target=self._export, daemon=False) -# -# def add(self, device_uuid : str, driver : _Driver) -> None: -# with self._lock: -# monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) -# if (monitoring_loop is not None) and monitoring_loop.is_running: return -# monitoring_loop = MonitoringLoop(driver, self._samples_queue) -# self._device_uuid__to__monitoring_loop[device_uuid] = monitoring_loop -# monitoring_loop.start() -# -# def remove(self, device_uuid : str) -> None: -# with self._lock: -# monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) -# if monitoring_loop is None: return -# if monitoring_loop.is_running: monitoring_loop.stop() -# self._device_uuid__to__monitoring_loop.pop(device_uuid, None) -# -# def start(self): -# self._exporter_thread.start() -# self._running.set() -# -# @property -# def is_running(self): return self._running.is_set() -# -# def stop(self): -# self._terminate.set() -# self._exporter_thread.join() -# -# def _export(self) -> None: -# while not self._terminate.is_set(): -# try: -# sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT) -# LOGGER.info('[MonitoringLoops:_export] sample={:s}'.format(str(sample))) -# except queue.Empty: -# continue -# # TODO: find in database the KpiId, format KPI and send to Monitoring -# kpi_data = {} -# self._monitoring_client.IncludeKpi(Kpi(**kpi_data)) +import logging, queue, threading +from typing import Dict +from common.orm.Database import Database +from common.orm.HighLevel import get_object +from common.orm.backend.Tools import key_to_str +from device.service.database.RelationModels import EndPointMonitorKpiModel +from monitoring.client.monitoring_client import MonitoringClient +from monitoring.proto.monitoring_pb2 import Kpi +from .database.KpiModel import KpiModel +from .driver_api._Driver import _Driver + +LOGGER = logging.getLogger(__name__) +QUEUE_GET_WAIT_TIMEOUT = 0.5 + +class MonitoringLoop: + def __init__(self, device_uuid : str, driver : _Driver, samples_queue : queue.Queue) -> None: + self._device_uuid = device_uuid + self._driver = driver + self._samples_queue = samples_queue + self._running = threading.Event() + self._terminate = threading.Event() + self._samples_stream = self._driver.GetState(blocking=True) + self._collector_thread = threading.Thread(target=self._collect, daemon=True) + + def _collect(self) -> None: + for sample in self._samples_stream: + if self._terminate.is_set(): break + sample = (self._device_uuid, *sample) + self._samples_queue.put_nowait(sample) + + def start(self): + self._collector_thread.start() + self._running.set() + + @property + def is_running(self): return self._running.is_set() + + def stop(self): + self._terminate.set() + self._samples_stream.cancel() + self._collector_thread.join() + +class MonitoringLoops: + def __init__(self, monitoring_client : MonitoringClient, database : Database) -> None: + self._monitoring_client = monitoring_client + self._database = database + self._samples_queue = queue.Queue() + self._running = threading.Event() + self._terminate = threading.Event() + self._lock = threading.Lock() + self._device_uuid__to__monitoring_loop : Dict[str, MonitoringLoop] = {} + self._exporter_thread = threading.Thread(target=self._export, daemon=True) + + def add(self, device_uuid : str, driver : _Driver) -> None: + with self._lock: + monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) + if (monitoring_loop is not None) and monitoring_loop.is_running: return + monitoring_loop = MonitoringLoop(device_uuid, driver, self._samples_queue) + self._device_uuid__to__monitoring_loop[device_uuid] = monitoring_loop + monitoring_loop.start() + + def remove(self, device_uuid : str) -> None: + with self._lock: + monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) + if monitoring_loop is None: return + if monitoring_loop.is_running: monitoring_loop.stop() + self._device_uuid__to__monitoring_loop.pop(device_uuid, None) + + def start(self): + self._exporter_thread.start() + self._running.set() + + @property + def is_running(self): return self._running.is_set() + + def stop(self): + self._terminate.set() + self._exporter_thread.join() + + def _export(self) -> None: + if self._database is None: + LOGGER.error('[MonitoringLoops:_export] Database not set. Terminating Exporter.') + return + + while not self._terminate.is_set(): + try: + sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT) + #LOGGER.debug('[MonitoringLoops:_export] sample={:s}'.format(str(sample))) + except queue.Empty: + continue + + device_uuid, timestamp, endpoint_monitor_resource_key, value = sample + str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':') + + #db_entries = self._database.dump() + #LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) + #for db_entry in db_entries: + # LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover + #LOGGER.info('-----------------------------------------------------------') + + db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object( + self._database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False) + if db_endpoint_monitor_kpi is None: + LOGGER.warning('EndPointMonitorKpi({:s}) not found'.format(str_endpoint_monitor_kpi_key)) + continue + + str_kpi_key = db_endpoint_monitor_kpi.kpi_fk + db_kpi : KpiModel = get_object( + self._database, KpiModel, str_kpi_key, raise_if_not_found=False) + if db_kpi is None: + LOGGER.warning('Kpi({:s}) not found'.format(str_kpi_key)) + continue + + if isinstance(value, int): + kpi_value_field_name = 'intVal' + kpi_value_field_cast = int + elif isinstance(value, float): + kpi_value_field_name = 'floatVal' + kpi_value_field_cast = float + elif isinstance(value, bool): + kpi_value_field_name = 'boolVal' + kpi_value_field_cast = bool + else: + kpi_value_field_name = 'stringVal' + kpi_value_field_cast = str + + try: + self._monitoring_client.IncludeKpi(Kpi(**{ + 'kpi_id' : {'kpi_id': {'uuid': db_kpi.kpi_uuid}}, + 'timestamp': str(timestamp), + 'kpi_value': {kpi_value_field_name: kpi_value_field_cast(value)} + })) + except: # pylint: disable=bare-except + LOGGER.exception('Unable to format/send Kpi') diff --git a/src/device/service/__main__.py b/src/device/service/__main__.py index 77572c51f9064712c2d9e4d9ccc9e943fe0df1c7..6832faf70b1d10f4796e50269ab018f0c6be147e 100644 --- a/src/device/service/__main__.py +++ b/src/device/service/__main__.py @@ -5,9 +5,9 @@ from context.client.ContextClient import ContextClient from device.Config import ( CONTEXT_SERVICE_HOST, CONTEXT_SERVICE_PORT, GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, METRICS_PORT, MONITORING_SERVICE_HOST, MONITORING_SERVICE_PORT) -#from monitoring.client.monitoring_client import MonitoringClient +from monitoring.client.monitoring_client import MonitoringClient from .DeviceService import DeviceService -#from .MonitoringLoops import MonitoringLoops +from .MonitoringLoops import MonitoringLoops from .driver_api.DriverFactory import DriverFactory from .driver_api.DriverInstanceCache import DriverInstanceCache from .drivers import DRIVERS @@ -49,31 +49,26 @@ def main(): str(context_service_host), str(context_service_port))) context_client = ContextClient(context_service_host, context_service_port) - ## Initialize Monitoring Client - #if monitoring_service_host is None or monitoring_service_port is None: - # raise Exception('Wrong address({:s}):port({:s}) of Monitoring component'.format( - # str(monitoring_service_host), str(monitoring_service_port))) - #monitoring_client = MonitoringClient(monitoring_service_host, monitoring_service_port) + # Initialize Monitoring Client + if monitoring_service_host is None or monitoring_service_port is None: + raise Exception('Wrong address({:s}):port({:s}) of Monitoring component'.format( + str(monitoring_service_host), str(monitoring_service_port))) + monitoring_client = MonitoringClient(monitoring_service_host, monitoring_service_port) # Initialize Driver framework driver_factory = DriverFactory(DRIVERS) driver_instance_cache = DriverInstanceCache(driver_factory) - #monitoring_loops = MonitoringLoops(monitoring_client) # Starting device service grpc_service = DeviceService( - context_client, driver_instance_cache, - #monitoring_loops, - port=grpc_service_port, max_workers=max_workers, + context_client, monitoring_client, driver_instance_cache, port=grpc_service_port, max_workers=max_workers, grace_period=grace_period) grpc_service.start() - #monitoring_loops.start() # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=0.1): pass LOGGER.info('Terminating...') - #monitoring_loops.stop() grpc_service.stop() driver_instance_cache.terminate() diff --git a/src/device/service/database/EndPointModel.py b/src/device/service/database/EndPointModel.py index 38b87d6f37c4e99dd3790f4d8802acd03873f77d..da10a67e676ca990f24cc1455bf6acc0999c1cc5 100644 --- a/src/device/service/database/EndPointModel.py +++ b/src/device/service/database/EndPointModel.py @@ -1,10 +1,12 @@ import logging from typing import Dict +from common.orm.fields.EnumeratedField import EnumeratedField from common.orm.fields.ForeignKeyField import ForeignKeyField from common.orm.fields.PrimaryKeyField import PrimaryKeyField from common.orm.fields.StringField import StringField from common.orm.model.Model import Model from .DeviceModel import DeviceModel +from .KpiSampleType import ORM_KpiSampleType from .TopologyModel import TopologyModel LOGGER = logging.getLogger(__name__) @@ -15,6 +17,7 @@ class EndPointModel(Model): device_fk = ForeignKeyField(DeviceModel) endpoint_uuid = StringField(required=True, allow_empty=False) endpoint_type = StringField() + resource_key = StringField(required=True, allow_empty=False) def dump_id(self) -> Dict: device_id = DeviceModel(self.database, self.device_fk).dump_id() @@ -31,3 +34,9 @@ class EndPointModel(Model): 'endpoint_id': self.dump_id(), 'endpoint_type': self.endpoint_type, } + +class EndPointMonitorModel(Model): + pk = PrimaryKeyField() + endpoint_fk = ForeignKeyField(EndPointModel) + resource_key = StringField(required=True, allow_empty=False) + kpi_sample_type = EnumeratedField(ORM_KpiSampleType, required=True) diff --git a/src/device/service/database/KpiSampleType.py b/src/device/service/database/KpiSampleType.py index e5c4c5bbc0407e7dd3650ca4ff2f2e95a1202472..397b208a57537201cc891e18159c975edf7147a9 100644 --- a/src/device/service/database/KpiSampleType.py +++ b/src/device/service/database/KpiSampleType.py @@ -4,11 +4,11 @@ from device.proto.kpi_sample_types_pb2 import KpiSampleType from .Tools import grpc_to_enum class ORM_KpiSampleType(Enum): - UNKNOWN = KpiSampleType.UNKNOWN - PACKETS_TRANSMITTED = KpiSampleType.PACKETS_TRANSMITTED - PACKETS_RECEIVED = KpiSampleType.PACKETS_RECEIVED - BYTES_TRANSMITTED = KpiSampleType.BYTES_TRANSMITTED - BYTES_RECEIVED = KpiSampleType.BYTES_RECEIVED + UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN + PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED + PACKETS_RECEIVED = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED + BYTES_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED + BYTES_RECEIVED = KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED grpc_to_enum__kpi_sample_type = functools.partial( grpc_to_enum, KpiSampleType, ORM_KpiSampleType) diff --git a/src/device/service/database/RelationModels.py b/src/device/service/database/RelationModels.py new file mode 100644 index 0000000000000000000000000000000000000000..6d1a9780f3bcd12d4aa3718c94e910b80a40ba18 --- /dev/null +++ b/src/device/service/database/RelationModels.py @@ -0,0 +1,13 @@ +import logging +from common.orm.fields.ForeignKeyField import ForeignKeyField +from common.orm.fields.PrimaryKeyField import PrimaryKeyField +from common.orm.model.Model import Model +from .EndPointModel import EndPointMonitorModel +from .KpiModel import KpiModel + +LOGGER = logging.getLogger(__name__) + +class EndPointMonitorKpiModel(Model): # pylint: disable=abstract-method + pk = PrimaryKeyField() + endpoint_monitor_fk = ForeignKeyField(EndPointMonitorModel) + kpi_fk = ForeignKeyField(KpiModel) diff --git a/src/device/service/driver_api/AnyTreeTools.py b/src/device/service/driver_api/AnyTreeTools.py index df61c7e030a13a3d0d758ce51a011aaa95deb49f..47e80e6c71cadd1fb24a6aecb0309f8194a06756 100644 --- a/src/device/service/driver_api/AnyTreeTools.py +++ b/src/device/service/driver_api/AnyTreeTools.py @@ -1,5 +1,6 @@ import anytree from typing import Any, List, Optional +from apscheduler.job import Job class TreeNode(anytree.node.Node): def __init__(self, name, parent=None, children=None, **kwargs) -> None: @@ -45,7 +46,10 @@ def set_subnode_value(resolver : anytree.Resolver, root : TreeNode, path : List[ node = resolver.get(node, path_item) except anytree.ChildResolverError: node = TreeNode(path_item, parent=node) - node.value = value + if isinstance(node.value, dict) and isinstance(value, dict): + node.value.update(value) + else: + node.value = value def dump_subtree(root : TreeNode): if not isinstance(root, TreeNode): raise Exception('root must be a TreeNode') @@ -56,5 +60,6 @@ def dump_subtree(root : TreeNode): if len(path) == 0: continue value = node.value if value is None: continue + if isinstance(value, Job): value = str(value) results.append((path, value)) return results diff --git a/src/device/service/drivers/emulated/EmulatedDriver.py b/src/device/service/drivers/emulated/EmulatedDriver.py index 06dc4bd7c36e99d5f081f97e2caa46b8bb5fd2d6..ae273890e29678f5fba9f3b8c84be88c65b3e142 100644 --- a/src/device/service/drivers/emulated/EmulatedDriver.py +++ b/src/device/service/drivers/emulated/EmulatedDriver.py @@ -1,16 +1,52 @@ +import json import anytree, logging, pytz, queue, random, threading from datetime import datetime, timedelta -from typing import Any, Iterator, List, Optional, Tuple, Union +from typing import Any, Dict, Iterator, List, Optional, Tuple, Union from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.job import Job from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.schedulers.background import BackgroundScheduler from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type -from device.service.driver_api._Driver import _Driver +from device.service.database.KpiSampleType import ORM_KpiSampleType, grpc_to_enum__kpi_sample_type +from device.service.driver_api._Driver import ( + RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES, + _Driver) from device.service.driver_api.AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value LOGGER = logging.getLogger(__name__) +SPECIAL_RESOURCE_MAPPINGS = { + RESOURCE_ENDPOINTS : '/endpoints', + RESOURCE_INTERFACES : '/interfaces', + RESOURCE_NETWORK_INSTANCES: '/net-instances', +} + +def compose_resource_endpoint(endpoint_data : Dict[str, Any]) -> Tuple[str, Any]: + endpoint_uuid = endpoint_data.get('uuid') + if endpoint_uuid is None: return None + endpoint_resource_path = SPECIAL_RESOURCE_MAPPINGS.get(RESOURCE_ENDPOINTS) + endpoint_resource_key = '{:s}/endpoint[{:s}]'.format(endpoint_resource_path, endpoint_uuid) + + endpoint_type = endpoint_data.get('type') + if endpoint_type is None: return None + + endpoint_sample_types = endpoint_data.get('sample_types') + if endpoint_sample_types is None: return None + sample_types = {} + for endpoint_sample_type in endpoint_sample_types: + try: + kpi_sample_type : ORM_KpiSampleType = grpc_to_enum__kpi_sample_type(endpoint_sample_type) + except: # pylint: disable=bare-except + LOGGER.warning('Unknown EndpointSampleType({:s}) for Endpoint({:s}). Ignoring and continuing...'.format( + str(endpoint_sample_type), str(endpoint_data))) + continue + metric_name = kpi_sample_type.name.lower() + monitoring_resource_key = '{:s}/state/{:s}'.format(endpoint_resource_key, metric_name) + sample_types[endpoint_sample_type] = monitoring_resource_key + + endpoint_resource_value = {'uuid': endpoint_uuid, 'type': endpoint_type, 'sample_types': sample_types} + return endpoint_resource_key, endpoint_resource_value + def do_sampling(resource_key : str, out_samples : queue.Queue): out_samples.put_nowait((datetime.timestamp(datetime.utcnow()), resource_key, random.random())) @@ -19,6 +55,15 @@ class EmulatedDriver(_Driver): self.__lock = threading.Lock() self.__initial = TreeNode('.') self.__running = TreeNode('.') + + endpoints = settings.get('endpoints', []) + endpoint_resources = [] + for endpoint in endpoints: + endpoint_resource = compose_resource_endpoint(endpoint) + if endpoint_resource is None: continue + endpoint_resources.append(endpoint_resource) + self.SetConfig(endpoint_resources) + self.__started = threading.Event() self.__terminate = threading.Event() self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events @@ -65,6 +110,7 @@ class EmulatedDriver(_Driver): str_resource_name = 'resource_key[#{:d}]'.format(i) try: chk_string(str_resource_name, resource_key, allow_empty=False) + resource_key = SPECIAL_RESOURCE_MAPPINGS.get(resource_key, resource_key) resource_path = resource_key.split('/') except Exception as e: # pylint: disable=broad-except LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key))) @@ -77,12 +123,6 @@ class EmulatedDriver(_Driver): results.extend(dump_subtree(resource_node)) return results - def GetResource(self, endpoint_uuid : str) -> Optional[str]: - chk_string('endpoint_uuid', endpoint_uuid) - return { - #'key': 'value', - }.get(endpoint_uuid) - def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: chk_type('resources', resources, list) if len(resources) == 0: return [] @@ -102,6 +142,11 @@ class EmulatedDriver(_Driver): results.append(e) # if validation fails, store the exception continue + try: + resource_value = json.loads(resource_value) + except: # pylint: disable=broad-except + pass + set_subnode_value(resolver, self.__running, resource_path, resource_value) results.append(True) return results diff --git a/src/device/service/drivers/openconfig/templates/EndPoints.py b/src/device/service/drivers/openconfig/templates/EndPoints.py index 4908c818374ca3b94fb77c5d5cd88d97d3c881b5..e6dd5ac87fb49dece5c1415abbdd3fb2058c5659 100644 --- a/src/device/service/drivers/openconfig/templates/EndPoints.py +++ b/src/device/service/drivers/openconfig/templates/EndPoints.py @@ -1,11 +1,13 @@ import logging, lxml.etree as ET from typing import Any, Dict, List, Tuple +from device.service.database.KpiSampleType import ORM_KpiSampleType from .Namespace import NAMESPACES -from .Tools import add_value_from_tag +from .Tools import add_value_from_collection, add_value_from_tag LOGGER = logging.getLogger(__name__) XPATH_PORTS = "//ocp:components/ocp:component/ocp:state[ocp:type='PORT']/.." +XPATH_INTERFACE_COUNTER = "//oci:interfaces/oci:interface[oci:name='{:s}']/state/counters/{:s}" def parse(xml_data : ET.Element) -> List[Tuple[str, Dict[str, Any]]]: response = [] @@ -16,12 +18,20 @@ def parse(xml_data : ET.Element) -> List[Tuple[str, Dict[str, Any]]]: component_name = xml_component.find('ocp:name', namespaces=NAMESPACES) if component_name is None or component_name.text is None: continue - add_value_from_tag(endpoint, 'name', component_name) + add_value_from_tag(endpoint, 'uuid', component_name) component_type = xml_component.find( 'ocpp:port/ocpp:breakout-mode/ocpp:state/ocpp:channel-speed', namespaces=NAMESPACES) add_value_from_tag(endpoint, 'type', component_type) + sample_types = { + ORM_KpiSampleType.BYTES_RECEIVED.value : XPATH_INTERFACE_COUNTER.format(endpoint['uuid'], 'in-octets' ), + ORM_KpiSampleType.BYTES_TRANSMITTED.value : XPATH_INTERFACE_COUNTER.format(endpoint['uuid'], 'out-octets'), + ORM_KpiSampleType.PACKETS_RECEIVED.value : XPATH_INTERFACE_COUNTER.format(endpoint['uuid'], 'in-pkts' ), + ORM_KpiSampleType.PACKETS_TRANSMITTED.value: XPATH_INTERFACE_COUNTER.format(endpoint['uuid'], 'out-pkts' ), + } + add_value_from_collection(endpoint, 'sample_types', sample_types) + if len(endpoint) == 0: continue - response.append(('endpoint[{:s}]'.format(endpoint['name']), endpoint)) + response.append(('endpoint[{:s}]'.format(endpoint['uuid']), endpoint)) return response diff --git a/src/device/tests/.gitignore b/src/device/tests/.gitignore index 067c7b77db596a97883a03426735b6ede9c6fa48..5cb8b444d357c5e39eb31759d67b92fca7beabb2 100644 --- a/src/device/tests/.gitignore +++ b/src/device/tests/.gitignore @@ -1,2 +1,3 @@ # Add here your files containing confidential testbed details such as IP addresses, ports, usernames, passwords, etc. Device_OpenConfig_Infinera.py +Device_Transport_Api_CTTC.py diff --git a/src/device/tests/Device_Emulated.py b/src/device/tests/Device_Emulated.py index 155383b49fd12d780e901db1aa2614a55d4e5e14..7f097f1c4936a69fc530391558227ebbdfb65c0f 100644 --- a/src/device/tests/Device_Emulated.py +++ b/src/device/tests/Device_Emulated.py @@ -1,10 +1,12 @@ +import operator from copy import deepcopy from device.proto.context_pb2 import DeviceDriverEnum, DeviceOperationalStatusEnum +from device.service.database.KpiSampleType import ORM_KpiSampleType from .Tools import config_rule_set, config_rule_delete # use "deepcopy" to prevent propagating forced changes during tests -DEVICE_EMU_UUID = 'EMULARED' +DEVICE_EMU_UUID = 'EMULATED' DEVICE_EMU_TYPE = 'emulated' DEVICE_EMU_ADDRESS = '127.0.0.1' DEVICE_EMU_PORT = '0' @@ -20,28 +22,80 @@ DEVICE_EMU = { 'device_endpoints': [], } +PACKET_PORT_SAMPLE_TYPES = [ + ORM_KpiSampleType.PACKETS_TRANSMITTED, + ORM_KpiSampleType.PACKETS_RECEIVED, + ORM_KpiSampleType.BYTES_TRANSMITTED, + ORM_KpiSampleType.BYTES_RECEIVED, +] + +ENDPOINT_UUIDS = ['EP1', 'EP2', 'EP3', 'EP4'] + +DEVICE_EMU_ENDPOINTS = [] +for endpoint_uuid in ENDPOINT_UUIDS: + DEVICE_EMU_ENDPOINTS.append((endpoint_uuid, '10Gbps', PACKET_PORT_SAMPLE_TYPES)) + +RSRC_EP = '/endpoints/endpoint[{:s}]' +RSRC_SUBIF = RSRC_EP + '/subinterfaces/subinterface[{:d}]' +RSRC_ADDRIPV4 = RSRC_SUBIF + '/ipv4/address[{:s}]' + +DEVICE_EMU_ENDPOINTS_COOKED = [] +for endpoint_uuid,endpoint_type,endpoint_sample_types in DEVICE_EMU_ENDPOINTS: + endpoint_resource_key = RSRC_EP.format(str(endpoint_uuid)) + sample_types = {} + for endpoint_sample_type in endpoint_sample_types: + sample_type_name = endpoint_sample_type.name.lower() + sample_types[endpoint_sample_type.value] = '{:s}/state/{:s}'.format(endpoint_resource_key, sample_type_name) + endpoint_resource_value = {'uuid': endpoint_uuid, 'type': endpoint_type, 'sample_types': sample_types} + DEVICE_EMU_ENDPOINTS_COOKED.append((endpoint_resource_key, endpoint_resource_value)) + DEVICE_EMU_CONNECT_RULES = [ config_rule_set('_connect/address', DEVICE_EMU_ADDRESS ), config_rule_set('_connect/port', DEVICE_EMU_PORT ), + config_rule_set('_connect/settings', {'endpoints': [ + { + 'uuid': endpoint_uuid, 'type': endpoint_type, + 'sample_types': list(map(operator.attrgetter('value'), endpoint_sample_types)), + } + for endpoint_uuid,endpoint_type,endpoint_sample_types in DEVICE_EMU_ENDPOINTS + ]}), ] -DEVICE_EMU_CONFIG_RULES = [ - config_rule_set('dev/rsrc1/value', 'value1'), - config_rule_set('dev/rsrc2/value', 'value2'), - config_rule_set('dev/rsrc3/value', 'value3'), -] +DEVICE_EMU_CONFIG_ENDPOINTS = [] +for endpoint_uuid in ENDPOINT_UUIDS: + DEVICE_EMU_CONFIG_ENDPOINTS.append(config_rule_set(RSRC_EP.format(endpoint_uuid), {'enabled' : True})) -DEVICE_EMU_RECONFIG_RULES = [ - config_rule_delete('dev/rsrc1/value', ''), - config_rule_set ('dev/rsrc10/value', 'value10'), - config_rule_set ('dev/rsrc11/value', 'value11'), - config_rule_set ('dev/rsrc12/value', 'value12'), -] +DEVICE_EMU_CONFIG_ADDRESSES = [] +for endpoint_uuid in ENDPOINT_UUIDS: + endpoint_number = int(endpoint_uuid.replace('EP', '')) + subinterface_index = 0 + subinterface_address = '10.{:d}.{:d}.1'.format(endpoint_number, subinterface_index) + subinterface_prefix_length = 24 + DEVICE_EMU_CONFIG_ADDRESSES.extend([ + config_rule_set(RSRC_SUBIF .format(endpoint_uuid, subinterface_index), { + 'index': subinterface_index}), + config_rule_set(RSRC_ADDRIPV4.format(endpoint_uuid, subinterface_index, subinterface_address), { + 'ip': subinterface_address, 'prefix_length': subinterface_prefix_length}), + ]) + +DEVICE_EMU_RECONFIG_ADDRESSES = [ + config_rule_delete(RSRC_SUBIF .format('EP2', 0 ), {}), + config_rule_delete(RSRC_ADDRIPV4.format('EP2', 0, '10.2.0.1'), {'ip': '10.2.0.1', 'prefix_length': 24}), -DEVICE_EMU_DECONFIG_RULES = [ - config_rule_delete('dev/rsrc2/value', 'value2'), - config_rule_delete('dev/rsrc3/value', 'value3'), - config_rule_delete('dev/rsrc10/value', 'value10'), - config_rule_delete('dev/rsrc11/value', 'value11'), - config_rule_delete('dev/rsrc12/value', 'value12'), + config_rule_set (RSRC_SUBIF .format('EP2', 1 ), {'index': 1}), + config_rule_set (RSRC_ADDRIPV4.format('EP2', 1, '10.2.1.1'), {'ip': '10.2.1.1', 'prefix_length': 24}), ] + +DEVICE_EMU_DECONFIG_ADDRESSES = [] +for endpoint_uuid in ENDPOINT_UUIDS: + endpoint_number = int(endpoint_uuid.replace('EP', '')) + subinterface_index = 1 if endpoint_uuid == 'EP2' else 0 + subinterface_address = '10.{:d}.{:d}.1'.format(endpoint_number, subinterface_index) + DEVICE_EMU_DECONFIG_ADDRESSES.extend([ + config_rule_delete(RSRC_SUBIF .format(endpoint_uuid, subinterface_index), {}), + config_rule_delete(RSRC_ADDRIPV4.format(endpoint_uuid, subinterface_index, subinterface_address), {}), + ]) + +DEVICE_EMU_DECONFIG_ENDPOINTS = [] +for endpoint_uuid in ENDPOINT_UUIDS: + DEVICE_EMU_DECONFIG_ENDPOINTS.append(config_rule_delete(RSRC_EP.format(endpoint_uuid), {})) diff --git a/src/device/tests/Device_OpenConfig_Template.py b/src/device/tests/Device_OpenConfig_Template.py index 73b6d4f55e8ffb08eb4c2e0badf8aa8d012c0d2b..5f917c5c4057727a64697251696fdeb0283ab7cd 100644 --- a/src/device/tests/Device_OpenConfig_Template.py +++ b/src/device/tests/Device_OpenConfig_Template.py @@ -4,13 +4,14 @@ from .Tools import config_rule_set, config_rule_delete # use "deepcopy" to prevent propagating forced changes during tests -DEVICE_OC_UUID = 'DEV2' -DEVICE_OC_TYPE = 'packet-router' -DEVICE_OC_ADDRESS = '127.0.0.1' # populate the Netconf Server IP address of the device to test -DEVICE_OC_PORT = '830' # populate the Netconf Server port of the device to test -DEVICE_OC_USERNAME = 'username' # populate the Netconf Server username of the device to test -DEVICE_OC_PASSWORD = 'password' # populate the Netconf Server password of the device to test -DEVICE_OC_DRIVERS = [DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG] +DEVICE_OC_UUID = 'DEV2' +DEVICE_OC_TYPE = 'packet-router' +DEVICE_OC_ADDRESS = '127.0.0.1' # populate the Netconf Server IP address of the device to test +DEVICE_OC_PORT = '830' # populate the Netconf Server port of the device to test +DEVICE_OC_USERNAME = 'username' # populate the Netconf Server username of the device to test +DEVICE_OC_PASSWORD = 'password' # populate the Netconf Server password of the device to test +DEVICE_OC_TIMEOUT = 120 +DEVICE_OC_DRIVERS = [DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG] DEVICE_OC_ID = {'device_uuid': {'uuid': DEVICE_OC_UUID}} DEVICE_OC = { @@ -23,10 +24,13 @@ DEVICE_OC = { } DEVICE_OC_CONNECT_RULES = [ - config_rule_set('_connect/address', DEVICE_OC_ADDRESS ), - config_rule_set('_connect/port', DEVICE_OC_PORT ), - config_rule_set('_connect/username', DEVICE_OC_USERNAME), - config_rule_set('_connect/password', DEVICE_OC_PASSWORD), + config_rule_set('_connect/address', DEVICE_OC_ADDRESS), + config_rule_set('_connect/port', DEVICE_OC_PORT ), + config_rule_set('_connect/settings', { + 'username': DEVICE_OC_USERNAME, + 'password': DEVICE_OC_PASSWORD, + 'timeout' : DEVICE_OC_TIMEOUT, + }), ] DEVICE_OC_CONFIG_RULES = [] # populate your configuration rules to test diff --git a/src/device/tests/MockMonitoringService.py b/src/device/tests/MockMonitoringService.py new file mode 100644 index 0000000000000000000000000000000000000000..6f51f0d8360f96ed183c11b336eb300fe3695a36 --- /dev/null +++ b/src/device/tests/MockMonitoringService.py @@ -0,0 +1,47 @@ +import grpc, logging +from concurrent import futures +from queue import Queue +from monitoring.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD +from monitoring.proto.monitoring_pb2_grpc import add_MonitoringServiceServicer_to_server +from .MockMonitoringServiceServicerImpl import MockMonitoringServiceServicerImpl + +BIND_ADDRESS = '0.0.0.0' +LOGGER = logging.getLogger(__name__) + +class MockMonitoringService: + def __init__( + self, address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, + grace_period=GRPC_GRACE_PERIOD): + + self.queue_samples = Queue() + self.address = address + self.port = port + self.endpoint = None + self.max_workers = max_workers + self.grace_period = grace_period + self.monitoring_servicer = None + self.pool = None + self.server = None + + def start(self): + self.endpoint = '{:s}:{:s}'.format(str(self.address), str(self.port)) + LOGGER.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format( + str(self.endpoint), str(self.max_workers))) + + self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers) + self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,)) + + self.monitoring_servicer = MockMonitoringServiceServicerImpl(self.queue_samples) + add_MonitoringServiceServicer_to_server(self.monitoring_servicer, self.server) + + port = self.server.add_insecure_port(self.endpoint) + self.endpoint = '{:s}:{:s}'.format(str(self.address), str(port)) + LOGGER.info('Listening on {:s}...'.format(str(self.endpoint))) + self.server.start() + + LOGGER.debug('Service started') + + def stop(self): + LOGGER.debug('Stopping service (grace period {:s} seconds)...'.format(str(self.grace_period))) + self.server.stop(self.grace_period) + LOGGER.debug('Service stopped') diff --git a/src/device/tests/MockMonitoringServiceServicerImpl.py b/src/device/tests/MockMonitoringServiceServicerImpl.py new file mode 100644 index 0000000000000000000000000000000000000000..1787acdaa68fd80037ab1f4d14d7f86599f9ac14 --- /dev/null +++ b/src/device/tests/MockMonitoringServiceServicerImpl.py @@ -0,0 +1,15 @@ +import logging +from queue import Queue +from monitoring.proto.context_pb2 import Empty +from monitoring.proto.monitoring_pb2 import Kpi +from monitoring.proto.monitoring_pb2_grpc import MonitoringServiceServicer + +LOGGER = logging.getLogger(__name__) + +class MockMonitoringServiceServicerImpl(MonitoringServiceServicer): + def __init__(self, queue_samples : Queue): + self.queue_samples = queue_samples + + def IncludeKpi(self, request : Kpi, context) -> Empty: + self.queue_samples.put(request) + return Empty() diff --git a/src/device/tests/Tools.py b/src/device/tests/Tools.py index 94a6d50900eb1865c69064b2e98bca0d6e91643b..2d8e99de30ebfcc4ed257a7bee512f4d416bd64c 100644 --- a/src/device/tests/Tools.py +++ b/src/device/tests/Tools.py @@ -13,9 +13,13 @@ def config_rule_set(resource_key : str, resource_value : Union[str, Dict[str, An def config_rule_delete(resource_key : str, resource_value : Union[str, Dict[str, Any]]): return config_rule(ConfigActionEnum.CONFIGACTION_DELETE, resource_key, resource_value) -def endpoint_id(topology_id, device_id, endpoint_uuid): - return {'topology_id': deepcopy(topology_id), 'device_id': deepcopy(device_id), - 'endpoint_uuid': {'uuid': endpoint_uuid}} +def endpoint_id(device_id, endpoint_uuid, topology_id=None): + result = {'device_id': deepcopy(device_id), 'endpoint_uuid': {'uuid': endpoint_uuid}} + if topology_id is not None: result['topology_id'] = deepcopy(topology_id) + return result -def endpoint(topology_id, device_id, endpoint_uuid, endpoint_type): - return {'endpoint_id': endpoint_id(topology_id, device_id, endpoint_uuid), 'endpoint_type': endpoint_type} +def endpoint(device_id, endpoint_uuid, endpoint_type, topology_id=None): + return { + 'endpoint_id': endpoint_id(device_id, endpoint_uuid, topology_id=topology_id), + 'endpoint_type': endpoint_type, + } diff --git a/src/device/tests/test_unitary.py b/src/device/tests/test_unitary.py index fc9175620b7763cabc9da58756054fda9092490d..7c2ad83d52b80e0a9d41cd8d0cecf9a4539618ea 100644 --- a/src/device/tests/test_unitary.py +++ b/src/device/tests/test_unitary.py @@ -1,5 +1,7 @@ -import copy, grpc, logging, operator, os, pytest +import copy, grpc, json, logging, operator, os, pytest, time +from datetime import datetime from typing import Tuple +from queue import Queue, Empty from google.protobuf.json_format import MessageToDict from common.orm.Database import Database from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum @@ -9,24 +11,31 @@ from context.Config import ( GRPC_SERVICE_PORT as CONTEXT_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as CONTEXT_GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD as CONTEXT_GRPC_GRACE_PERIOD) from context.client.ContextClient import ContextClient -from context.proto.context_pb2 import DeviceId +from context.proto.context_pb2 import DeviceId, DeviceOperationalStatusEnum from context.service.grpc_server.ContextService import ContextService from device.Config import ( GRPC_SERVICE_PORT as DEVICE_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as DEVICE_GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD as DEVICE_GRPC_GRACE_PERIOD) from device.client.DeviceClient import DeviceClient from device.proto.context_pb2 import ConfigActionEnum, Context, Device, Topology +from device.proto.device_pb2 import MonitoringSettings +from device.proto.kpi_sample_types_pb2 import KpiSampleType from device.service.DeviceService import DeviceService -#from device.service.MonitoringLoops import MonitoringLoops from device.service.driver_api._Driver import _Driver from device.service.driver_api.DriverFactory import DriverFactory from device.service.driver_api.DriverInstanceCache import DriverInstanceCache from device.service.drivers import DRIVERS -#from monitoring.client.monitoring_client import MonitoringClient +from device.tests.MockMonitoringService import MockMonitoringService +from device.tests.Tools import endpoint_id +from monitoring.Config import ( + GRPC_SERVICE_PORT as MONITORING_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as MONITORING_GRPC_MAX_WORKERS, + GRPC_GRACE_PERIOD as MONITORING_GRPC_GRACE_PERIOD) +from monitoring.client.monitoring_client import MonitoringClient from .CommonObjects import CONTEXT, TOPOLOGY from .Device_Emulated import ( - DEVICE_EMU, DEVICE_EMU_CONFIG_RULES, DEVICE_EMU_CONNECT_RULES, DEVICE_EMU_DECONFIG_RULES, DEVICE_EMU_ID, - DEVICE_EMU_RECONFIG_RULES, DEVICE_EMU_UUID) + DEVICE_EMU, DEVICE_EMU_CONFIG_ADDRESSES, DEVICE_EMU_CONFIG_ENDPOINTS, DEVICE_EMU_CONNECT_RULES, + DEVICE_EMU_DECONFIG_ADDRESSES, DEVICE_EMU_DECONFIG_ENDPOINTS, DEVICE_EMU_ENDPOINTS, DEVICE_EMU_ENDPOINTS_COOKED, + DEVICE_EMU_ID, DEVICE_EMU_RECONFIG_ADDRESSES, DEVICE_EMU_UUID) try: from .Device_OpenConfig_Infinera import( DEVICE_OC, DEVICE_OC_CONFIG_RULES, DEVICE_OC_DECONFIG_RULES, DEVICE_OC_CONNECT_RULES, DEVICE_OC_ID, @@ -40,11 +49,13 @@ except ImportError: # DEVICE_OC, DEVICE_OC_CONFIG_RULES, DEVICE_OC_DECONFIG_RULES, DEVICE_OC_CONNECT_RULES, DEVICE_OC_ID, # DEVICE_OC_UUID) + LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) CONTEXT_GRPC_SERVICE_PORT = 10000 + CONTEXT_GRPC_SERVICE_PORT # avoid privileged ports DEVICE_GRPC_SERVICE_PORT = 10000 + DEVICE_GRPC_SERVICE_PORT # avoid privileged ports +MONITORING_GRPC_SERVICE_PORT = 10000 + MONITORING_GRPC_SERVICE_PORT # avoid privileged ports DEFAULT_REDIS_SERVICE_HOST = '127.0.0.1' DEFAULT_REDIS_SERVICE_PORT = 6379 @@ -87,18 +98,32 @@ def context_client(context_service : ContextService): # pylint: disable=redefine _client.close() @pytest.fixture(scope='session') -def device_service(context_client : ContextClient): # pylint: disable=redefined-outer-name +def monitoring_service(): + _service = MockMonitoringService(port=MONITORING_GRPC_SERVICE_PORT, max_workers=MONITORING_GRPC_MAX_WORKERS, + grace_period=MONITORING_GRPC_GRACE_PERIOD) + _service.start() + yield _service + _service.stop() + +@pytest.fixture(scope='session') +def monitoring_client(monitoring_service : MockMonitoringService): # pylint: disable=redefined-outer-name + _client = MonitoringClient(server='127.0.0.1', port=MONITORING_GRPC_SERVICE_PORT) + #yield _client + #_client.close() + return _client + +@pytest.fixture(scope='session') +def device_service( + context_client : ContextClient, # pylint: disable=redefined-outer-name + monitoring_client : MonitoringClient): # pylint: disable=redefined-outer-name + _driver_factory = DriverFactory(DRIVERS) _driver_instance_cache = DriverInstanceCache(_driver_factory) - #_monitoring_loops = MonitoringLoops(None) # TODO: replace by monitoring client - #_monitoring_loops.start() _service = DeviceService( - context_client, _driver_instance_cache, - #_monitoring_loops, - port=DEVICE_GRPC_SERVICE_PORT, max_workers=DEVICE_GRPC_MAX_WORKERS, grace_period=DEVICE_GRPC_GRACE_PERIOD) + context_client, monitoring_client, _driver_instance_cache, port=DEVICE_GRPC_SERVICE_PORT, + max_workers=DEVICE_GRPC_MAX_WORKERS, grace_period=DEVICE_GRPC_GRACE_PERIOD) _service.start() yield _service - #_monitoring_loops.stop() _service.stop() @pytest.fixture(scope='session') @@ -122,6 +147,9 @@ def test_prepare_environment( # ----- Test Device Driver Emulated ------------------------------------------------------------------------------------ +# Device Driver Emulated tests are used to validate Driver API as well as Emulated Device Driver. Note that other +# Drivers might support a different set of resource paths, and attributes/values per resource; however, they must +# implement the Driver API. def test_device_emulated_add_error_cases( context_client : ContextClient, # pylint: disable=redefined-outer-name @@ -131,7 +159,7 @@ def test_device_emulated_add_error_cases( with pytest.raises(grpc.RpcError) as e: DEVICE_EMU_WITH_EXTRA_RULES = copy.deepcopy(DEVICE_EMU) DEVICE_EMU_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONNECT_RULES) - DEVICE_EMU_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONFIG_RULES) + DEVICE_EMU_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONFIG_ENDPOINTS) device_client.AddDevice(Device(**DEVICE_EMU_WITH_EXTRA_RULES)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT msg_head = 'device.device_config.config_rules([' @@ -159,10 +187,10 @@ def test_device_emulated_get( device_service : DeviceService): # pylint: disable=redefined-outer-name initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE_EMU_ID)) - LOGGER.info('initial_config = {:s}'.format(grpc_message_to_json_string(initial_config))) + #LOGGER.info('initial_config = {:s}'.format(grpc_message_to_json_string(initial_config))) device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID)) - LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data))) + #LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data))) def test_device_emulated_configure( @@ -174,61 +202,193 @@ def test_device_emulated_configure( assert driver is not None driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) - LOGGER.info('driver_config = {:s}'.format(str(driver_config))) - assert len(driver_config) == 0 + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + assert len(driver_config) == len(DEVICE_EMU_ENDPOINTS_COOKED) + for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED: + assert endpoint_cooked in driver_config + + DEVICE_EMU_WITH_CONFIG_RULES = copy.deepcopy(DEVICE_EMU) + DEVICE_EMU_WITH_CONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONFIG_ENDPOINTS) + device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_CONFIG_RULES)) DEVICE_EMU_WITH_CONFIG_RULES = copy.deepcopy(DEVICE_EMU) - DEVICE_EMU_WITH_CONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONFIG_RULES) + DEVICE_EMU_WITH_CONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONFIG_ADDRESSES) device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_CONFIG_RULES)) + DEVICE_EMU_WITH_OPERATIONAL_STATUS = copy.deepcopy(DEVICE_EMU) + DEVICE_EMU_WITH_OPERATIONAL_STATUS['device_operational_status'] = \ + DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED + device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_OPERATIONAL_STATUS)) + + driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) - LOGGER.info('driver_config = {:s}'.format(str(driver_config))) - assert len(driver_config) == 3 - assert driver_config[0] == ('/dev/rsrc1/value', 'value1') - assert driver_config[1] == ('/dev/rsrc2/value', 'value2') - assert driver_config[2] == ('/dev/rsrc3/value', 'value3') + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + assert len(driver_config) == len(DEVICE_EMU_ENDPOINTS_COOKED) + len(DEVICE_EMU_CONFIG_ADDRESSES) + for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED: + endpoint_cooked = copy.deepcopy(endpoint_cooked) + endpoint_cooked[1]['enabled'] = True + assert endpoint_cooked in driver_config + for config_rule in DEVICE_EMU_CONFIG_ADDRESSES: + assert (config_rule['resource_key'], json.loads(config_rule['resource_value'])) in driver_config device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID)) + assert device_data.device_operational_status == DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED + config_rules = [ (ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value) for config_rule in device_data.device_config.config_rules ] - LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( - '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules]))) - for config_rule in DEVICE_EMU_CONFIG_RULES: + #LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( + # '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules]))) + RESULTING_CONFIG_ENDPOINTS = {cr['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ENDPOINTS)} + for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED: + values = json.loads(RESULTING_CONFIG_ENDPOINTS[endpoint_cooked[0]]['resource_value']) + values.update(endpoint_cooked[1]) + RESULTING_CONFIG_ENDPOINTS[endpoint_cooked[0]]['resource_value'] = json.dumps(values, sort_keys=True) + for config_rule in RESULTING_CONFIG_ENDPOINTS.values(): config_rule = ( - ConfigActionEnum.Name(config_rule['action']), config_rule['resource_key'], config_rule['resource_value']) + ConfigActionEnum.Name(config_rule['action']), config_rule['resource_key'], + json.loads(json.dumps(config_rule['resource_value']))) + assert config_rule in config_rules + for config_rule in DEVICE_EMU_CONFIG_ADDRESSES: + config_rule = ( + ConfigActionEnum.Name(config_rule['action']), config_rule['resource_key'], + json.loads(json.dumps(config_rule['resource_value']))) assert config_rule in config_rules # Try to reconfigure... DEVICE_EMU_WITH_RECONFIG_RULES = copy.deepcopy(DEVICE_EMU) - DEVICE_EMU_WITH_RECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_RECONFIG_RULES) + DEVICE_EMU_WITH_RECONFIG_RULES['device_operational_status'] = \ + DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED + DEVICE_EMU_WITH_RECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_RECONFIG_ADDRESSES) device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_RECONFIG_RULES)) + RESULTING_CONFIG_RULES = {cr['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ENDPOINTS)} + for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED: + values = json.loads(RESULTING_CONFIG_RULES[endpoint_cooked[0]]['resource_value']) + values.update(endpoint_cooked[1]) + RESULTING_CONFIG_RULES[endpoint_cooked[0]]['resource_value'] = json.dumps(values, sort_keys=True) + RESULTING_CONFIG_RULES.update({cr['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ADDRESSES)}) + for reconfig_rule in DEVICE_EMU_RECONFIG_ADDRESSES: + if reconfig_rule['action'] == ConfigActionEnum.CONFIGACTION_DELETE: + RESULTING_CONFIG_RULES.pop(reconfig_rule['resource_key'], None) + else: + RESULTING_CONFIG_RULES[reconfig_rule['resource_key']] = reconfig_rule + RESULTING_CONFIG_RULES = RESULTING_CONFIG_RULES.values() + #LOGGER.info('RESULTING_CONFIG_RULES = {:s}'.format(str(RESULTING_CONFIG_RULES))) + driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) - LOGGER.info('driver_config = {:s}'.format(str(driver_config))) - assert len(driver_config) == 5 - assert driver_config[0] == ('/dev/rsrc10/value', 'value10') - assert driver_config[1] == ('/dev/rsrc11/value', 'value11') - assert driver_config[2] == ('/dev/rsrc12/value', 'value12') - assert driver_config[3] == ('/dev/rsrc2/value', 'value2') - assert driver_config[4] == ('/dev/rsrc3/value', 'value3') + driver_config = json.loads(json.dumps(driver_config)) # prevent integer keys to fail matching with string keys + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + assert len(driver_config) == len(RESULTING_CONFIG_RULES) + for config_rule in RESULTING_CONFIG_RULES: + resource = [config_rule['resource_key'], json.loads(config_rule['resource_value'])] + assert resource in driver_config device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID)) config_rules = [ (ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value) for config_rule in device_data.device_config.config_rules ] - LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( - '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules]))) - final_config_rules = DEVICE_EMU_CONFIG_RULES[1:] + DEVICE_EMU_RECONFIG_RULES[1:] # remove '/dev/rsrc1/value' - for config_rule in final_config_rules: + #LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( + # '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules]))) + for config_rule in RESULTING_CONFIG_RULES: config_rule = ( ConfigActionEnum.Name(config_rule['action']), config_rule['resource_key'], config_rule['resource_value']) assert config_rule in config_rules +def test_device_emulated_monitor( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + device_service : DeviceService, # pylint: disable=redefined-outer-name + monitoring_service : MockMonitoringService): # pylint: disable=redefined-outer-name + + #device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID)) + #LOGGER.info('device_data = \n{:s}'.format(str(device_data))) + + driver : _Driver = device_service.driver_instance_cache.get(DEVICE_EMU_UUID) # we know the driver exists now + assert driver is not None + driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + assert len(driver_config) == len(DEVICE_EMU_ENDPOINTS_COOKED) + len(DEVICE_EMU_CONFIG_ADDRESSES) + + SAMPLING_DURATION_SEC = 3.0 + SAMPLING_INTERVAL_SEC = 0.5 + + MONITORING_SETTINGS_LIST = [] + KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED = {} + for endpoint_uuid,_,sample_types in DEVICE_EMU_ENDPOINTS: + for sample_type in sample_types: + sample_type_id = sample_type.value + sample_type_name = str(KpiSampleType.Name(sample_type_id)).upper().replace('KPISAMPLETYPE_', '') + kpi_uuid = '{:s}-{:s}-{:s}-kpi_uuid'.format(DEVICE_EMU_UUID, endpoint_uuid, str(sample_type_id)) + monitoring_settings = { + 'kpi_id' : {'kpi_id': {'uuid': kpi_uuid}}, + 'kpi_descriptor': { + 'kpi_description': 'Metric {:s} for Endpoint {:s} in Device {:s}'.format( + sample_type_name, endpoint_uuid, DEVICE_EMU_UUID), + 'kpi_sample_type': sample_type_id, + 'device_id': DEVICE_EMU_ID, + 'endpoint_id': endpoint_id(DEVICE_EMU_ID, endpoint_uuid), + }, + 'sampling_duration_s': SAMPLING_DURATION_SEC, + 'sampling_interval_s': SAMPLING_INTERVAL_SEC, + } + MONITORING_SETTINGS_LIST.append(monitoring_settings) + KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED[kpi_uuid] = 0 + + NUM_SAMPLES_EXPECTED_PER_KPI = SAMPLING_DURATION_SEC / SAMPLING_INTERVAL_SEC + NUM_SAMPLES_EXPECTED = len(MONITORING_SETTINGS_LIST) * NUM_SAMPLES_EXPECTED_PER_KPI + + # Start monitoring the device + t_start_monitoring = datetime.timestamp(datetime.utcnow()) + for monitoring_settings in MONITORING_SETTINGS_LIST: + device_client.MonitorDeviceKpi(MonitoringSettings(**monitoring_settings)) + + # wait to receive the expected number of samples + # if takes more than 1.5 times the sampling duration, assume there is an error + time_ini = time.time() + queue_samples : Queue = monitoring_service.queue_samples + received_samples = [] + while (len(received_samples) < NUM_SAMPLES_EXPECTED) and (time.time() - time_ini < SAMPLING_DURATION_SEC * 1.5): + try: + received_sample = queue_samples.get(block=True, timeout=SAMPLING_INTERVAL_SEC / NUM_SAMPLES_EXPECTED) + #LOGGER.info('received_sample = {:s}'.format(str(received_sample))) + received_samples.append(received_sample) + except Empty: + continue + + t_end_monitoring = datetime.timestamp(datetime.utcnow()) + + LOGGER.info('received_samples = {:s}'.format(str(received_samples))) + assert len(received_samples) == NUM_SAMPLES_EXPECTED + for received_sample in received_samples: + kpi_uuid = received_sample.kpi_id.kpi_id.uuid + assert kpi_uuid in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED + assert isinstance(received_sample.timestamp, str) + timestamp = float(received_sample.timestamp) + assert timestamp > t_start_monitoring + assert timestamp < t_end_monitoring + assert received_sample.kpi_value.HasField('floatVal') + assert isinstance(received_sample.kpi_value.floatVal, float) + KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED[kpi_uuid] += 1 + + LOGGER.info('KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED = {:s}'.format(str(KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED))) + for kpi_uuid, num_samples_received in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED.items(): + assert num_samples_received == NUM_SAMPLES_EXPECTED_PER_KPI + + # Unsubscribe monitoring + for kpi_uuid in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED.keys(): + MONITORING_SETTINGS_UNSUBSCRIBE = { + 'kpi_id' : {'kpi_id': {'uuid': kpi_uuid}}, + 'sampling_duration_s': -1, # negative value in sampling_duration_s or in sampling_interval_s means unsibscribe + 'sampling_interval_s': -1, # kpi_id is mandatory to unsibscribe + } + device_client.MonitorDeviceKpi(MonitoringSettings(**MONITORING_SETTINGS_UNSUBSCRIBE)) + + def test_device_emulated_deconfigure( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name @@ -238,14 +398,37 @@ def test_device_emulated_deconfigure( assert driver is not None driver_config = driver.GetConfig() - LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) DEVICE_EMU_WITH_DECONFIG_RULES = copy.deepcopy(DEVICE_EMU) - DEVICE_EMU_WITH_DECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_DECONFIG_RULES) + DEVICE_EMU_WITH_DECONFIG_RULES['device_operational_status'] = \ + DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED + DEVICE_EMU_WITH_DECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_DECONFIG_ADDRESSES) device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_DECONFIG_RULES)) + RESULTING_CONFIG_RULES = {cr['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ENDPOINTS)} + for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED: + values = json.loads(RESULTING_CONFIG_RULES[endpoint_cooked[0]]['resource_value']) + values.update(endpoint_cooked[1]) + RESULTING_CONFIG_RULES[endpoint_cooked[0]]['resource_value'] = json.dumps(values, sort_keys=True) + RESULTING_CONFIG_RULES = RESULTING_CONFIG_RULES.values() driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) - LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + driver_config = json.loads(json.dumps(driver_config)) # prevent integer keys to fail matching with string keys + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + assert len(driver_config) == len(RESULTING_CONFIG_RULES) + #LOGGER.info('RESULTING_CONFIG_RULES = {:s}'.format(str(RESULTING_CONFIG_RULES))) + for config_rule in RESULTING_CONFIG_RULES: + config_rule = [config_rule['resource_key'], json.loads(config_rule['resource_value'])] + #LOGGER.info('config_rule = {:s}'.format(str(config_rule))) + assert config_rule in driver_config + + DEVICE_EMU_WITH_DECONFIG_RULES = copy.deepcopy(DEVICE_EMU) + DEVICE_EMU_WITH_DECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_DECONFIG_ENDPOINTS) + device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_DECONFIG_RULES)) + + driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) + driver_config = json.loads(json.dumps(driver_config)) # prevent integer keys to fail matching with string keys + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) assert len(driver_config) == 0 device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID)) diff --git a/src/device/tests/test_unitary_driverapi.py b/src/device/tests/test_unitary_driverapi.py deleted file mode 100644 index 027e7775eae4a3f7a19c056266e1fc807b09cf2d..0000000000000000000000000000000000000000 --- a/src/device/tests/test_unitary_driverapi.py +++ /dev/null @@ -1,129 +0,0 @@ -import copy, logging, math, pytest, time -from device.service.drivers.emulated.EmulatedDriver import EmulatedDriver - -LOGGER = logging.getLogger(__name__) -LOGGER.setLevel(logging.DEBUG) - -PATH_IF = '/interfaces/interface[name="{}"]' -PATH_SUBIF = PATH_IF + '/subinterfaces/subinterface[index="{}"]' -PATH_ADDRIPV4 = PATH_SUBIF + '/ipv4/address[ip="{}"]' - -DEVICE_CONFIG_IF1 = [] -DEVICE_CONFIG_IF1.append((PATH_IF .format('IF1' ) + '/config/name', "IF1" )) -DEVICE_CONFIG_IF1.append((PATH_IF .format('IF1' ) + '/config/enabled', True )) -DEVICE_CONFIG_IF1.append((PATH_SUBIF .format('IF1', 0 ) + '/config/index', 0 )) -DEVICE_CONFIG_IF1.append((PATH_ADDRIPV4.format('IF1', 0, '10.1.0.1') + '/config/ip', "10.1.0.1")) -DEVICE_CONFIG_IF1.append((PATH_ADDRIPV4.format('IF1', 0, '10.1.0.1') + '/config/prefix_length', 24 )) - -DEVICE_CONFIG_IF2 = [] -DEVICE_CONFIG_IF2.append((PATH_IF .format('IF2' ) + '/config/name', "IF2" )) -DEVICE_CONFIG_IF2.append((PATH_IF .format('IF2' ) + '/config/enabled', True )) -DEVICE_CONFIG_IF2.append((PATH_SUBIF .format('IF2', 0 ) + '/config/index', 0 )) -DEVICE_CONFIG_IF2.append((PATH_ADDRIPV4.format('IF2', 0, '10.2.0.1') + '/config/ip', "10.2.0.1")) -DEVICE_CONFIG_IF2.append((PATH_ADDRIPV4.format('IF2', 0, '10.2.0.1') + '/config/prefix_length', 24 )) - -PATH_IF_TX_PKTS = PATH_IF + 'state/tx_packets_per_second' -PATH_IF_RX_PKTS = PATH_IF + 'state/rx_packets_per_second' - -DEVICE_STATE_IF1_TX_PKTS = PATH_IF_TX_PKTS.format('IF1') -DEVICE_STATE_IF1_RX_PKTS = PATH_IF_RX_PKTS.format('IF1') -DEVICE_STATE_IF2_TX_PKTS = PATH_IF_TX_PKTS.format('IF2') -DEVICE_STATE_IF2_RX_PKTS = PATH_IF_RX_PKTS.format('IF2') - -@pytest.fixture(scope='session') -def device_driverapi_emulated(): - _driver = EmulatedDriver('127.0.0.1', 0) - _driver.Connect() - yield _driver - _driver.Disconnect() - -def test_device_driverapi_emulated_setconfig( - device_driverapi_emulated : EmulatedDriver): # pylint: disable=redefined-outer-name - - results = device_driverapi_emulated.SetConfig(DEVICE_CONFIG_IF1) - LOGGER.info('results:\n{:s}'.format('\n'.join(map(str, results)))) - assert len(results) == len(DEVICE_CONFIG_IF1) - for result in results: assert isinstance(result, bool) and result - - results = device_driverapi_emulated.SetConfig(DEVICE_CONFIG_IF2) - LOGGER.info('results:\n{:s}'.format('\n'.join(map(str, results)))) - assert len(results) == len(DEVICE_CONFIG_IF2) - for result in results: assert isinstance(result, bool) and result - -def test_device_driverapi_emulated_getconfig( - device_driverapi_emulated : EmulatedDriver): # pylint: disable=redefined-outer-name - - stored_config = device_driverapi_emulated.GetConfig() - LOGGER.info('stored_config:\n{:s}'.format('\n'.join(map(str, stored_config)))) - assert len(stored_config) == len(DEVICE_CONFIG_IF1) + len(DEVICE_CONFIG_IF2) - for config_row in stored_config: assert (config_row in DEVICE_CONFIG_IF1) or (config_row in DEVICE_CONFIG_IF2) - for config_row in DEVICE_CONFIG_IF1: assert config_row in stored_config - for config_row in DEVICE_CONFIG_IF2: assert config_row in stored_config - - stored_config = device_driverapi_emulated.GetConfig([PATH_IF.format('IF2')]) - LOGGER.info('stored_config:\n{:s}'.format('\n'.join(map(str, stored_config)))) - assert len(stored_config) == len(DEVICE_CONFIG_IF2) - for config_row in stored_config: assert config_row in DEVICE_CONFIG_IF2 - for config_row in DEVICE_CONFIG_IF2: assert config_row in stored_config - -def test_device_driverapi_emulated_deleteconfig( - device_driverapi_emulated : EmulatedDriver): # pylint: disable=redefined-outer-name - - results = device_driverapi_emulated.DeleteConfig([(PATH_ADDRIPV4.format('IF2', 0, '10.2.0.1'), '')]) - LOGGER.info('results:\n{:s}'.format('\n'.join(map(str, results)))) - assert (len(results) == 1) and isinstance(results[0], bool) and results[0] - - stored_config = device_driverapi_emulated.GetConfig() - LOGGER.info('stored_config:\n{:s}'.format('\n'.join(map(str, stored_config)))) - - device_config_if2 = list(filter(lambda row: '10.2.0.1' not in row[0], copy.deepcopy(DEVICE_CONFIG_IF2))) - assert len(stored_config) == len(DEVICE_CONFIG_IF1) + len(device_config_if2) - for config_row in stored_config: assert (config_row in DEVICE_CONFIG_IF1) or (config_row in device_config_if2) - for config_row in DEVICE_CONFIG_IF1: assert config_row in stored_config - for config_row in device_config_if2: assert config_row in stored_config - -def test_device_driverapi_emulated_subscriptions( - device_driverapi_emulated : EmulatedDriver): # pylint: disable=redefined-outer-name - - duration = 10.0 - interval = 1.5 - results = device_driverapi_emulated.SubscribeState([ - (DEVICE_STATE_IF1_TX_PKTS, duration, interval), - (DEVICE_STATE_IF1_RX_PKTS, duration, interval), - (DEVICE_STATE_IF2_TX_PKTS, duration, interval), - (DEVICE_STATE_IF2_RX_PKTS, duration, interval), - ]) - LOGGER.info('results:\n{:s}'.format('\n'.join(map(str, results)))) - assert len(results) == 4 - for result in results: assert isinstance(result, bool) and result - - stored_config = device_driverapi_emulated.GetConfig() - LOGGER.info('stored_config:\n{:s}'.format('\n'.join(map(str, stored_config)))) - - time.sleep(duration + 1.0) # let time to generate samples, plus 1 second extra time - - samples = [] - for sample in device_driverapi_emulated.GetState(blocking=False): - LOGGER.info('sample: {:s}'.format(str(sample))) - timestamp,resource_key,resource_value = sample - samples.append((timestamp, resource_key, resource_value)) - LOGGER.info('samples:\n{:s}'.format('\n'.join(map(str, samples)))) - assert len(samples) == 4 * (math.floor(duration/interval) + 1) - - results = device_driverapi_emulated.UnsubscribeState([ - (DEVICE_STATE_IF1_TX_PKTS, 10.0, 1.5), - (DEVICE_STATE_IF1_RX_PKTS, 10.0, 1.5), - (DEVICE_STATE_IF2_TX_PKTS, 10.0, 1.5), - (DEVICE_STATE_IF2_RX_PKTS, 10.0, 1.5), - ]) - LOGGER.info('results:\n{:s}'.format('\n'.join(map(str, results)))) - assert len(results) == 4 - for result in results: assert isinstance(result, bool) and result - - stored_config = device_driverapi_emulated.GetConfig() - LOGGER.info('stored_config:\n{:s}'.format('\n'.join(map(str, stored_config)))) - device_config_if2 = list(filter(lambda row: '10.2.0.1' not in row[0], copy.deepcopy(DEVICE_CONFIG_IF2))) - assert len(stored_config) == len(DEVICE_CONFIG_IF1) + len(device_config_if2) - for config_row in stored_config: assert (config_row in DEVICE_CONFIG_IF1) or (config_row in device_config_if2) - for config_row in DEVICE_CONFIG_IF1: assert config_row in stored_config - for config_row in device_config_if2: assert config_row in stored_config diff --git a/src/device/tests/test_unitary_service.py b/src/device/tests/test_unitary_service.py deleted file mode 100644 index 8d9591dd6492395a44adc25e4a54eaebc8ff9121..0000000000000000000000000000000000000000 --- a/src/device/tests/test_unitary_service.py +++ /dev/null @@ -1,270 +0,0 @@ -import copy, grpc, logging, pytest -from google.protobuf.json_format import MessageToDict -from common.database.Factory import get_database, DatabaseEngineEnum -from common.database.api.context.Constants import DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID -from common.database.api.context.topology.device.OperationalStatus import OperationalStatus -from common.tests.Assertions import validate_device_id, validate_empty -from device.client.DeviceClient import DeviceClient -from device.proto.context_pb2 import Device, DeviceId -from device.service.DeviceService import DeviceService -from device.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD - -port = 10000 + GRPC_SERVICE_PORT # avoid privileged ports - -LOGGER = logging.getLogger(__name__) -LOGGER.setLevel(logging.DEBUG) - -# use "copy.deepcopy" to prevent propagating forced changes during tests -CONTEXT_ID = {'contextUuid': {'uuid': DEFAULT_CONTEXT_ID}} -TOPOLOGY_ID = {'contextId': copy.deepcopy(CONTEXT_ID), 'topoId': {'uuid': DEFAULT_TOPOLOGY_ID}} -DEVICE_ID = {'device_id': {'uuid': 'DEV1'}} -DEVICE = { - 'device_id': copy.deepcopy(DEVICE_ID), - 'device_type': 'ROADM', - 'device_config': {'device_config': '<config/>'}, - 'devOperationalStatus': OperationalStatus.ENABLED.value, - 'endpointList' : [ - { - 'port_id': {'topoId': copy.deepcopy(TOPOLOGY_ID), 'dev_id': copy.deepcopy(DEVICE_ID), 'port_id': {'uuid' : 'EP2'}}, - 'port_type': 'WDM' - }, - { - 'port_id': {'topoId': copy.deepcopy(TOPOLOGY_ID), 'dev_id': copy.deepcopy(DEVICE_ID), 'port_id': {'uuid' : 'EP3'}}, - 'port_type': 'WDM' - }, - { - 'port_id': {'topoId': copy.deepcopy(TOPOLOGY_ID), 'dev_id': copy.deepcopy(DEVICE_ID), 'port_id': {'uuid' : 'EP4'}}, - 'port_type': 'WDM' - }, - ] -} - -@pytest.fixture(scope='session') -def device_database(): - _database = get_database(engine=DatabaseEngineEnum.INMEMORY) - return _database - -@pytest.fixture(scope='session') -def device_service(device_database): - _service = DeviceService( - device_database, port=port, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD) - _service.start() - yield _service - _service.stop() - -@pytest.fixture(scope='session') -def device_client(device_service): - _client = DeviceClient(address='127.0.0.1', port=port) - yield _client - _client.close() - -def test_add_device_wrong_attributes(device_client : DeviceClient): - # should fail with device uuid is empty - with pytest.raises(grpc._channel._InactiveRpcError) as e: - copy_device = copy.deepcopy(DEVICE) - copy_device['device_id']['device_id']['uuid'] = '' - device_client.AddDevice(Device(**copy_device)) - assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT - msg = 'device.device_id.device_id.uuid() is out of range: '\ - 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).' - assert e.value.details() == msg - - # should fail with device type is empty - with pytest.raises(grpc._channel._InactiveRpcError) as e: - copy_device = copy.deepcopy(DEVICE) - copy_device['device_type'] = '' - device_client.AddDevice(Device(**copy_device)) - assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT - msg = 'device.device_type() is out of range: '\ - 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).' - assert e.value.details() == msg - - # should fail with wrong device operational status - with pytest.raises(grpc._channel._InactiveRpcError) as e: - copy_device = copy.deepcopy(DEVICE) - copy_device['devOperationalStatus'] = OperationalStatus.KEEP_STATE.value - device_client.AddDevice(Device(**copy_device)) - assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT - msg = 'Method(AddDevice) does not accept OperationalStatus(KEEP_STATE). '\ - 'Permitted values for Method(AddDevice) are OperationalStatus([\'DISABLED\', \'ENABLED\']).' - assert e.value.details() == msg - -def test_add_device_wrong_endpoint(device_client : DeviceClient): - # should fail with unsupported endpoint context - with pytest.raises(grpc._channel._InactiveRpcError) as e: - copy_device = copy.deepcopy(DEVICE) - copy_device['endpointList'][0]['port_id']['topoId']['contextId']['contextUuid']['uuid'] = 'wrong-context' - request = Device(**copy_device) - device_client.AddDevice(request) - assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT - msg = 'Context(wrong-context) in Endpoint(#0) of '\ - 'Context(admin)/Topology(admin)/Device(DEV1) mismatches acceptable Contexts({\'admin\'}). '\ - 'Optionally, leave field empty to use predefined Context(admin).' - assert e.value.details() == msg - - # should fail with unsupported endpoint topology - with pytest.raises(grpc._channel._InactiveRpcError) as e: - copy_device = copy.deepcopy(DEVICE) - copy_device['endpointList'][0]['port_id']['topoId']['topoId']['uuid'] = 'wrong-topo' - device_client.AddDevice(Device(**copy_device)) - assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT - msg = 'Context(admin)/Topology(wrong-topo) in Endpoint(#0) of '\ - 'Context(admin)/Topology(admin)/Device(DEV1) mismatches acceptable Topologies({\'admin\'}). '\ - 'Optionally, leave field empty to use predefined Topology(admin).' - assert e.value.details() == msg - - # should fail with wrong endpoint device - with pytest.raises(grpc._channel._InactiveRpcError) as e: - copy_device = copy.deepcopy(DEVICE) - copy_device['endpointList'][0]['port_id']['dev_id']['device_id']['uuid'] = 'wrong-device' - device_client.AddDevice(Device(**copy_device)) - assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT - msg = 'Context(admin)/Topology(admin)/Device(wrong-device) in Endpoint(#0) of '\ - 'Context(admin)/Topology(admin)/Device(DEV1) mismatches acceptable Devices({\'DEV1\'}). '\ - 'Optionally, leave field empty to use predefined Device(DEV1).' - assert e.value.details() == msg - - # should fail with endpoint port uuid is empty - with pytest.raises(grpc._channel._InactiveRpcError) as e: - copy_device = copy.deepcopy(DEVICE) - copy_device['endpointList'][0]['port_id']['port_id']['uuid'] = '' - device_client.AddDevice(Device(**copy_device)) - assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT - msg = 'endpoint_id[#0].port_id.uuid() is out of range: '\ - 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).' - assert e.value.details() == msg - - # should fail with endpoint port type is empty - with pytest.raises(grpc._channel._InactiveRpcError) as e: - copy_device = copy.deepcopy(DEVICE) - copy_device['endpointList'][0]['port_type'] = '' - device_client.AddDevice(Device(**copy_device)) - assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT - msg = 'endpoint[#0].port_type() is out of range: '\ - 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).' - assert e.value.details() == msg - - # should fail with duplicate port in device - with pytest.raises(grpc._channel._InactiveRpcError) as e: - copy_device = copy.deepcopy(DEVICE) - copy_device['endpointList'][1]['port_id']['port_id']['uuid'] = 'EP2' - device_client.AddDevice(Device(**copy_device)) - assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT - msg = 'Duplicated Context(admin)/Topology(admin)/Device(DEV1)/Port(EP2) in Endpoint(#1) of '\ - 'Context(admin)/Topology(admin)/Device(DEV1).' - assert e.value.details() == msg - -def test_add_device(device_client : DeviceClient): - # should work - validate_device_id(MessageToDict( - device_client.AddDevice(Device(**DEVICE)), - including_default_value_fields=True, preserving_proto_field_name=True, - use_integers_for_enums=False)) - -def test_add_device_duplicate(device_client : DeviceClient): - # should fail with device already exists - with pytest.raises(grpc._channel._InactiveRpcError) as e: - device_client.AddDevice(Device(**DEVICE)) - assert e.value.code() == grpc.StatusCode.ALREADY_EXISTS - msg = 'Context(admin)/Topology(admin)/Device(DEV1) already exists in the database.' - assert e.value.details() == msg - -def test_delete_device_empty_uuid(device_client : DeviceClient): - # should fail with device uuid is empty - with pytest.raises(grpc._channel._InactiveRpcError) as e: - copy_device_id = copy.deepcopy(DEVICE_ID) - copy_device_id['device_id']['uuid'] = '' - device_client.DeleteDevice(DeviceId(**copy_device_id)) - assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT - msg = 'device_id.device_id.uuid() is out of range: '\ - 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).' - assert e.value.details() == msg - -def test_delete_device_not_found(device_client : DeviceClient): - # should fail with device not found - with pytest.raises(grpc._channel._InactiveRpcError) as e: - copy_device_id = copy.deepcopy(DEVICE_ID) - copy_device_id['device_id']['uuid'] = 'wrong-device-id' - device_client.DeleteDevice(DeviceId(**copy_device_id)) - assert e.value.code() == grpc.StatusCode.NOT_FOUND - msg = 'Context(admin)/Topology(admin)/Device(wrong-device-id) does not exist in the database.' - assert e.value.details() == msg - -def test_delete_device(device_client : DeviceClient): - # should work - validate_empty(MessageToDict( - device_client.DeleteDevice(DeviceId(**DEVICE_ID)), - including_default_value_fields=True, preserving_proto_field_name=True, - use_integers_for_enums=False)) - -def test_configure_device_empty_device_uuid(device_client : DeviceClient): - # should fail with device uuid is empty - with pytest.raises(grpc._channel._InactiveRpcError) as e: - copy_device = copy.deepcopy(DEVICE) - copy_device['device_id']['device_id']['uuid'] = '' - device_client.ConfigureDevice(Device(**copy_device)) - assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT - msg = 'device.device_id.device_id.uuid() is out of range: '\ - 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).' - assert e.value.details() == msg - -def test_configure_device_not_found(device_client : DeviceClient): - # should fail with device not found - with pytest.raises(grpc._channel._InactiveRpcError) as e: - copy_device = copy.deepcopy(DEVICE) - copy_device['device_id']['device_id']['uuid'] = 'wrong-device-id' - device_client.ConfigureDevice(Device(**copy_device)) - assert e.value.code() == grpc.StatusCode.NOT_FOUND - msg = 'Context(admin)/Topology(admin)/Device(wrong-device-id) does not exist in the database.' - assert e.value.details() == msg - -def test_add_device_default_endpoint_context_topology_device(device_client : DeviceClient): - # should work - copy_device = copy.deepcopy(DEVICE) - copy_device['endpointList'][0]['port_id']['topoId']['contextId']['contextUuid']['uuid'] = '' - copy_device['endpointList'][0]['port_id']['topoId']['topoId']['uuid'] = '' - copy_device['endpointList'][0]['port_id']['dev_id']['device_id']['uuid'] = '' - validate_device_id(MessageToDict( - device_client.AddDevice(Device(**copy_device)), - including_default_value_fields=True, preserving_proto_field_name=True, - use_integers_for_enums=False)) - -def test_configure_device_wrong_attributes(device_client : DeviceClient): - # should fail with device type is wrong - with pytest.raises(grpc._channel._InactiveRpcError) as e: - copy_device = copy.deepcopy(DEVICE) - copy_device['device_type'] = 'wrong-type' - device_client.ConfigureDevice(Device(**copy_device)) - assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT - msg = 'Device(DEV1) has Type(ROADM) in the database. Cannot be changed to Type(wrong-type).' - assert e.value.details() == msg - - # should fail with endpoints cannot be modified - with pytest.raises(grpc._channel._InactiveRpcError) as e: - copy_device = copy.deepcopy(DEVICE) - device_client.ConfigureDevice(Device(**copy_device)) - assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT - assert e.value.details() == 'Endpoints belonging to Device(DEV1) cannot be modified.' - - # should fail with any change detected - with pytest.raises(grpc._channel._InactiveRpcError) as e: - copy_device = copy.deepcopy(DEVICE) - copy_device['device_config']['device_config'] = '' - copy_device['devOperationalStatus'] = OperationalStatus.KEEP_STATE.value - copy_device['endpointList'].clear() - device_client.ConfigureDevice(Device(**copy_device)) - assert e.value.code() == grpc.StatusCode.ABORTED - msg = 'Any change has been requested for Device(DEV1). '\ - 'Either specify a new configuration or a new device operational status.' - assert e.value.details() == msg - -def test_configure_device(device_client : DeviceClient): - # should work - copy_device = copy.deepcopy(DEVICE) - copy_device['device_config']['device_config'] = '<new_config/>' - copy_device['devOperationalStatus'] = OperationalStatus.DISABLED.value - copy_device['endpointList'].clear() - validate_device_id(MessageToDict( - device_client.ConfigureDevice(Device(**copy_device)), - including_default_value_fields=True, preserving_proto_field_name=True, - use_integers_for_enums=False))