diff --git a/scripts/run_tests_locally-device-all.sh b/scripts/run_tests_locally-device-all.sh new file mode 100755 index 0000000000000000000000000000000000000000..2cf8faaf50355a3cc5f3a0206498ed4dacb48523 --- /dev/null +++ b/scripts/run_tests_locally-device-all.sh @@ -0,0 +1,40 @@ +#!/bin/bash +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src +RCFILE=$PROJECTDIR/coverage/.coveragerc + +# Run unitary tests and analyze coverage of code at same time + +# Useful flags for pytest: +#-o log_cli=true -o log_file=device.log -o log_file_level=DEBUG + +coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ + device/tests/test_unitary_emulated.py + +coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ + device/tests/test_unitary_openconfig.py + +coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ + device/tests/test_unitary_tapi.py + +coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ + device/tests/test_unitary_p4.py + +coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ + device/tests/test_unitary_microwave.py diff --git a/src/device/tests/MockMonitoringServiceServicerImpl.py b/scripts/run_tests_locally-device-emulated.sh old mode 100644 new mode 100755 similarity index 54% rename from src/device/tests/MockMonitoringServiceServicerImpl.py rename to scripts/run_tests_locally-device-emulated.sh index 05ca43dda0418df151bc3dfe255a90d75b50a088..ab4f77adaf9c0549551c91d944c1c6db77a8b9cb --- a/src/device/tests/MockMonitoringServiceServicerImpl.py +++ b/scripts/run_tests_locally-device-emulated.sh @@ -1,3 +1,4 @@ +#!/bin/bash # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,18 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging -from queue import Queue -from monitoring.proto.context_pb2 import Empty -from monitoring.proto.monitoring_pb2 import Kpi -from monitoring.proto.monitoring_pb2_grpc import MonitoringServiceServicer -LOGGER = logging.getLogger(__name__) +PROJECTDIR=`pwd` -class MockMonitoringServiceServicerImpl(MonitoringServiceServicer): - def __init__(self, queue_samples : Queue): - self.queue_samples = queue_samples +cd $PROJECTDIR/src +RCFILE=$PROJECTDIR/coverage/.coveragerc - def IncludeKpi(self, request : Kpi, context) -> Empty: - self.queue_samples.put(request) - return Empty() +# Run unitary tests and analyze coverage of code at same time + +# Useful flags for pytest: +#-o log_cli=true -o log_file=device.log -o log_file_level=DEBUG + +coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ + device/tests/test_unitary_emulated.py diff --git a/scripts/run_test_microwave_device.sh b/scripts/run_tests_locally-device-microwave.sh similarity index 91% rename from scripts/run_test_microwave_device.sh rename to scripts/run_tests_locally-device-microwave.sh index 34317b56484108d8ef83ef1c1eb74fbc31bfc25c..e03630c9f63c65cae91464b76cc3ddc447835f42 100755 --- a/scripts/run_test_microwave_device.sh +++ b/scripts/run_tests_locally-device-microwave.sh @@ -24,5 +24,5 @@ RCFILE=$PROJECTDIR/coverage/.coveragerc # Useful flags for pytest: #-o log_cli=true -o log_file=device.log -o log_file_level=DEBUG -coverage run --rcfile=$RCFILE --append -m pytest -s --log-level=INFO --verbose \ +coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ device/tests/test_unitary_microwave.py diff --git a/scripts/run_tests_locally-device-openconfig.sh b/scripts/run_tests_locally-device-openconfig.sh new file mode 100755 index 0000000000000000000000000000000000000000..83d4a0545a3386395ead97f40d45c034350c73b9 --- /dev/null +++ b/scripts/run_tests_locally-device-openconfig.sh @@ -0,0 +1,28 @@ +#!/bin/bash +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src +RCFILE=$PROJECTDIR/coverage/.coveragerc + +# Run unitary tests and analyze coverage of code at same time + +# Useful flags for pytest: +#-o log_cli=true -o log_file=device.log -o log_file_level=DEBUG + +coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ + device/tests/test_unitary_openconfig.py diff --git a/scripts/run_tests_locally-device.sh b/scripts/run_tests_locally-device-p4.sh similarity index 96% rename from scripts/run_tests_locally-device.sh rename to scripts/run_tests_locally-device-p4.sh index ba6c0b6a58031720addc17cc0de9169e592099f5..36b381a3cd9214603456828b41e6d70b8c6c908d 100755 --- a/scripts/run_tests_locally-device.sh +++ b/scripts/run_tests_locally-device-p4.sh @@ -25,4 +25,4 @@ RCFILE=$PROJECTDIR/coverage/.coveragerc #-o log_cli=true -o log_file=device.log -o log_file_level=DEBUG coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ - device/tests/test_unitary.py + device/tests/test_unitary_p4.py diff --git a/scripts/run_tests_locally-device-tapi.sh b/scripts/run_tests_locally-device-tapi.sh new file mode 100755 index 0000000000000000000000000000000000000000..a281466b677f256b2ce9fe7770bf2b052ef59126 --- /dev/null +++ b/scripts/run_tests_locally-device-tapi.sh @@ -0,0 +1,28 @@ +#!/bin/bash +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src +RCFILE=$PROJECTDIR/coverage/.coveragerc + +# Run unitary tests and analyze coverage of code at same time + +# Useful flags for pytest: +#-o log_cli=true -o log_file=device.log -o log_file_level=DEBUG + +coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ + device/tests/test_unitary_tapi.py diff --git a/src/device/Config.py b/src/device/Config.py index 415ae7b01ced740a0dc09f215ae71ad553a2672e..70a33251242c51f49140e596b8208a19dd5245f7 100644 --- a/src/device/Config.py +++ b/src/device/Config.py @@ -12,21 +12,3 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - -# General settings -LOG_LEVEL = logging.WARNING - -# gRPC settings -GRPC_SERVICE_PORT = 2020 -GRPC_MAX_WORKERS = 10 -GRPC_GRACE_PERIOD = 60 - -# Prometheus settings -METRICS_PORT = 9192 - -# Dependency micro-service connection settings -CONTEXT_SERVICE_HOST = '127.0.0.1' -CONTEXT_SERVICE_PORT = 1010 -MONITORING_SERVICE_HOST = '127.0.0.1' -MONITORING_SERVICE_PORT = 7070 diff --git a/src/device/client/DeviceClient.py b/src/device/client/DeviceClient.py index 2a9512411a2d9e55e4c5b0fa75d48fc54d810713..7fe54cb23babee7a2f3bc9d21082732d924f5eff 100644 --- a/src/device/client/DeviceClient.py +++ b/src/device/client/DeviceClient.py @@ -13,7 +13,10 @@ # limitations under the License. import grpc, logging +from common.Constants import ServiceNameEnum +from common.Settings import get_service_host, get_service_port_grpc from common.tools.client.RetryDecorator import retry, delay_exponential +from common.tools.grpc.Tools import grpc_message_to_json_string from device.proto.context_pb2 import Device, DeviceConfig, DeviceId, Empty from device.proto.device_pb2 import MonitoringSettings from device.proto.device_pb2_grpc import DeviceServiceStub @@ -24,8 +27,10 @@ DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') class DeviceClient: - def __init__(self, address, port): - self.endpoint = '{:s}:{:s}'.format(str(address), str(port)) + def __init__(self, host=None, port=None): + if not host: host = get_service_host(ServiceNameEnum.DEVICE) + if not port: port = get_service_port_grpc(ServiceNameEnum.DEVICE) + self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) self.channel = None self.stub = None @@ -37,41 +42,41 @@ class DeviceClient: self.stub = DeviceServiceStub(self.channel) def close(self): - if(self.channel is not None): self.channel.close() + if self.channel is not None: self.channel.close() self.channel = None self.stub = None @RETRY_DECORATOR def AddDevice(self, request : Device) -> DeviceId: - LOGGER.debug('AddDevice request: {:s}'.format(str(request))) + LOGGER.debug('AddDevice request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.AddDevice(request) - LOGGER.debug('AddDevice result: {:s}'.format(str(response))) + LOGGER.debug('AddDevice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def ConfigureDevice(self, request : Device) -> DeviceId: - LOGGER.debug('ConfigureDevice request: {:s}'.format(str(request))) + LOGGER.debug('ConfigureDevice request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.ConfigureDevice(request) - LOGGER.debug('ConfigureDevice result: {:s}'.format(str(response))) + LOGGER.debug('ConfigureDevice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def DeleteDevice(self, request : DeviceId) -> Empty: - LOGGER.debug('DeleteDevice request: {:s}'.format(str(request))) + LOGGER.debug('DeleteDevice request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.DeleteDevice(request) - LOGGER.debug('DeleteDevice result: {:s}'.format(str(response))) + LOGGER.debug('DeleteDevice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetInitialConfig(self, request : DeviceId) -> DeviceConfig: - LOGGER.debug('GetInitialConfig request: {:s}'.format(str(request))) + LOGGER.debug('GetInitialConfig request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetInitialConfig(request) - LOGGER.debug('GetInitialConfig result: {:s}'.format(str(response))) + LOGGER.debug('GetInitialConfig result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def MonitorDeviceKpi(self, request : MonitoringSettings) -> Empty: - LOGGER.debug('MonitorDeviceKpi request: {:s}'.format(str(request))) + LOGGER.debug('MonitorDeviceKpi request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.MonitorDeviceKpi(request) - LOGGER.debug('MonitorDeviceKpi result: {:s}'.format(str(response))) + LOGGER.debug('MonitorDeviceKpi result: {:s}'.format(grpc_message_to_json_string(response))) return response diff --git a/src/device/service/DeviceService.py b/src/device/service/DeviceService.py index bb2cc09535af579a24cf05687d2883d81c7c914b..4f9b032e8a224e89e48daebf52687cc892ca534a 100644 --- a/src/device/service/DeviceService.py +++ b/src/device/service/DeviceService.py @@ -12,76 +12,29 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging -from concurrent import futures -from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH -from grpc_health.v1.health_pb2 import HealthCheckResponse -from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server +from common.Constants import ServiceNameEnum +from common.Settings import get_service_port_grpc from common.orm.backend.BackendEnum import BackendEnum from common.orm.Database import Database from common.orm.Factory import get_database_backend -from context.client.ContextClient import ContextClient -from device.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD +from common.tools.service.GenericGrpcService import GenericGrpcService from device.proto.device_pb2_grpc import add_DeviceServiceServicer_to_server -from monitoring.client.monitoring_client import MonitoringClient from .driver_api.DriverInstanceCache import DriverInstanceCache from .DeviceServiceServicerImpl import DeviceServiceServicerImpl from .MonitoringLoops import MonitoringLoops -BIND_ADDRESS = '0.0.0.0' -LOGGER = logging.getLogger(__name__) - -class DeviceService: - def __init__( - self, context_client : ContextClient, monitoring_client : MonitoringClient, - driver_instance_cache : DriverInstanceCache, - address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD): - - self.context_client = context_client - self.monitoring_client = monitoring_client - self.driver_instance_cache = driver_instance_cache - self.address = address - self.port = port - self.endpoint = None - self.max_workers = max_workers - self.grace_period = grace_period - self.device_servicer = None - self.health_servicer = None - self.pool = None - self.server = None - - self.database = Database(get_database_backend(backend=BackendEnum.INMEMORY)) - self.monitoring_loops = MonitoringLoops(monitoring_client, self.database) - - def start(self): - self.endpoint = '{:s}:{:s}'.format(str(self.address), str(self.port)) - LOGGER.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format( - str(self.endpoint), str(self.max_workers))) +class DeviceService(GenericGrpcService): + def __init__(self, driver_instance_cache : DriverInstanceCache, cls_name: str = __name__) -> None: + port = get_service_port_grpc(ServiceNameEnum.DEVICE) + super().__init__(port, cls_name=cls_name) + database = Database(get_database_backend(backend=BackendEnum.INMEMORY)) + self.monitoring_loops = MonitoringLoops(database) + self.device_servicer = DeviceServiceServicerImpl(database, driver_instance_cache, self.monitoring_loops) + def install_servicers(self): self.monitoring_loops.start() - - self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers) - self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,)) - - self.device_servicer = DeviceServiceServicerImpl( - self.context_client, self.database, self.driver_instance_cache, self.monitoring_loops) add_DeviceServiceServicer_to_server(self.device_servicer, self.server) - self.health_servicer = HealthServicer( - experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1)) - add_HealthServicer_to_server(self.health_servicer, self.server) - - port = self.server.add_insecure_port(self.endpoint) - self.endpoint = '{:s}:{:s}'.format(str(self.address), str(port)) - LOGGER.info('Listening on {:s}...'.format(str(self.endpoint))) - self.server.start() - self.health_servicer.set(OVERALL_HEALTH, HealthCheckResponse.SERVING) # pylint: disable=maybe-no-member - - LOGGER.debug('Service started') - def stop(self): - LOGGER.debug('Stopping service (grace period {:s} seconds)...'.format(str(self.grace_period))) - self.health_servicer.enter_graceful_shutdown() - self.server.stop(self.grace_period) + super().stop() self.monitoring_loops.stop() - LOGGER.debug('Service stopped') diff --git a/src/device/service/DeviceServiceServicerImpl.py b/src/device/service/DeviceServiceServicerImpl.py index 8e00b344f0462ae56b289cfc5d33e6c1b1c42b7e..e328c76cdd29147163adc9713c86fc0efc31555d 100644 --- a/src/device/service/DeviceServiceServicerImpl.py +++ b/src/device/service/DeviceServiceServicerImpl.py @@ -49,11 +49,10 @@ METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) class DeviceServiceServicerImpl(DeviceServiceServicer): def __init__( - self, context_client : ContextClient, database : Database, driver_instance_cache : DriverInstanceCache, - monitoring_loops : MonitoringLoops): - + self, database : Database, driver_instance_cache : DriverInstanceCache, monitoring_loops : MonitoringLoops + ) -> None: LOGGER.debug('Creating Servicer...') - self.context_client = context_client + self.context_client = ContextClient() self.database = database self.driver_instance_cache = driver_instance_cache self.monitoring_loops = monitoring_loops diff --git a/src/device/service/MonitoringLoops.py b/src/device/service/MonitoringLoops.py index e5b671f7f06beade5ab9f8b6539527999d49b9e8..eff634c75537ed26fd77f02f6adb85f0b0555aa8 100644 --- a/src/device/service/MonitoringLoops.py +++ b/src/device/service/MonitoringLoops.py @@ -18,7 +18,7 @@ from typing import Dict from common.orm.Database import Database from common.orm.HighLevel import get_object from common.orm.backend.Tools import key_to_str -from monitoring.client.monitoring_client import MonitoringClient +from monitoring.client.MonitoringClient import MonitoringClient from monitoring.proto.monitoring_pb2 import Kpi from .database.KpiModel import KpiModel from .database.RelationModels import EndPointMonitorKpiModel @@ -55,8 +55,8 @@ class MonitoringLoop: self._collector_thread.join() class MonitoringLoops: - def __init__(self, monitoring_client : MonitoringClient, database : Database) -> None: - self._monitoring_client = monitoring_client + def __init__(self, database : Database) -> None: + self._monitoring_client = MonitoringClient() self._database = database self._samples_queue = queue.Queue() self._running = threading.Event() @@ -82,7 +82,6 @@ class MonitoringLoops: def start(self): self._exporter_thread.start() - self._running.set() @property def is_running(self): return self._running.is_set() @@ -96,6 +95,7 @@ class MonitoringLoops: LOGGER.error('[MonitoringLoops:_export] Database not set. Terminating Exporter.') return + self._running.set() while not self._terminate.is_set(): try: sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT) @@ -149,3 +149,5 @@ class MonitoringLoops: })) except: # pylint: disable=bare-except LOGGER.exception('Unable to format/send Kpi') + + self._running.clear() diff --git a/src/device/service/__main__.py b/src/device/service/__main__.py index 0e92cabba4ddae84a9d4cd938c7e8b31ab4f0531..1f0adfa8f1dd8b3e307ed202967b1d5195171f11 100644 --- a/src/device/service/__main__.py +++ b/src/device/service/__main__.py @@ -14,12 +14,10 @@ import logging, signal, sys, threading from prometheus_client import start_http_server -from common.Settings import get_setting, wait_for_environment_variables -from context.client.ContextClient import ContextClient -from device.Config import ( - CONTEXT_SERVICE_HOST, CONTEXT_SERVICE_PORT, GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, - METRICS_PORT, MONITORING_SERVICE_HOST, MONITORING_SERVICE_PORT) -from monitoring.client.monitoring_client import MonitoringClient +from common.Constants import ServiceNameEnum +from common.Settings import ( + ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, + wait_for_environment_variables) from .DeviceService import DeviceService from .driver_api.DriverFactory import DriverFactory from .driver_api.DriverInstanceCache import DriverInstanceCache @@ -35,12 +33,7 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name def main(): global LOGGER # pylint: disable=global-statement - grpc_service_port = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT ) - max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS ) - grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD ) - log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL ) - metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT ) - + log_level = get_log_level() logging.basicConfig(level=log_level) logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING) logging.getLogger('apscheduler.scheduler').setLevel(logging.WARNING) @@ -48,43 +41,25 @@ def main(): LOGGER = logging.getLogger(__name__) wait_for_environment_variables([ - 'CONTEXTSERVICE_SERVICE_HOST', 'CONTEXTSERVICE_SERVICE_PORT_GRPC', - 'MONITORINGSERVICE_SERVICE_HOST', 'MONITORINGSERVICE_SERVICE_PORT_GRPC' + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), ]) - context_service_host = get_setting('CONTEXTSERVICE_SERVICE_HOST', default=CONTEXT_SERVICE_HOST ) - context_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC', default=CONTEXT_SERVICE_PORT ) - monitoring_service_host = get_setting('MONITORINGSERVICE_SERVICE_HOST', default=MONITORING_SERVICE_HOST) - monitoring_service_port = get_setting('MONITORINGSERVICE_SERVICE_PORT_GRPC', default=MONITORING_SERVICE_PORT) - signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) LOGGER.info('Starting...') # Start metrics server + metrics_port = get_metrics_port() start_http_server(metrics_port) - # Initialize Context Client - if context_service_host is None or context_service_port is None: - raise Exception('Wrong address({:s}):port({:s}) of Context component'.format( - str(context_service_host), str(context_service_port))) - context_client = ContextClient(context_service_host, context_service_port) - - # Initialize Monitoring Client - if monitoring_service_host is None or monitoring_service_port is None: - raise Exception('Wrong address({:s}):port({:s}) of Monitoring component'.format( - str(monitoring_service_host), str(monitoring_service_port))) - monitoring_client = MonitoringClient(monitoring_service_host, monitoring_service_port) - # Initialize Driver framework driver_factory = DriverFactory(DRIVERS) driver_instance_cache = DriverInstanceCache(driver_factory) # Starting device service - grpc_service = DeviceService( - context_client, monitoring_client, driver_instance_cache, port=grpc_service_port, max_workers=max_workers, - grace_period=grace_period) + grpc_service = DeviceService(driver_instance_cache) grpc_service.start() # Wait for Ctrl+C or termination signal diff --git a/src/device/service/database/EndPointModel.py b/src/device/service/database/EndPointModel.py index 286a51db69738782a6d1acaed5b1d7846ac67b2b..84d0c97073481af162b1e66f7e35c93bc6e1eed5 100644 --- a/src/device/service/database/EndPointModel.py +++ b/src/device/service/database/EndPointModel.py @@ -15,6 +15,7 @@ import logging from typing import Dict, List from common.orm.Database import Database +from common.orm.HighLevel import update_or_create_object from common.orm.backend.Tools import key_to_str from common.orm.fields.EnumeratedField import EnumeratedField from common.orm.fields.ForeignKeyField import ForeignKeyField @@ -72,9 +73,14 @@ def set_endpoint_monitors(database : Database, db_endpoint : EndPointModel, grpc db_endpoint_pk = db_endpoint.pk for kpi_sample_type in grpc_endpoint_kpi_sample_types: orm_kpi_sample_type = grpc_to_enum__kpi_sample_type(kpi_sample_type) - str_endpoint_kpi_sample_type_key = key_to_str([db_endpoint_pk, orm_kpi_sample_type.name]) - db_endpoint_kpi_sample_type = EndPointMonitorModel(database, str_endpoint_kpi_sample_type_key) - db_endpoint_kpi_sample_type.endpoint_fk = db_endpoint - db_endpoint_kpi_sample_type.resource_key = '' # during initialization, allow empty value - db_endpoint_kpi_sample_type.kpi_sample_type = orm_kpi_sample_type - db_endpoint_kpi_sample_type.save() + str_endpoint_kpi_sample_type_key = key_to_str([db_endpoint_pk, str(orm_kpi_sample_type.value)]) + #db_endpoint_kpi_sample_type = EndPointMonitorModel(database, str_endpoint_kpi_sample_type_key) + #db_endpoint_kpi_sample_type.endpoint_fk = db_endpoint + #db_endpoint_kpi_sample_type.resource_key = '' # during initialization, allow empty value + #db_endpoint_kpi_sample_type.kpi_sample_type = orm_kpi_sample_type + #db_endpoint_kpi_sample_type.save() + update_or_create_object(database, EndPointMonitorModel, str_endpoint_kpi_sample_type_key, { + 'endpoint_fk' : db_endpoint, + #'resource_key' : '', # during initialization, allow empty value + 'kpi_sample_type': orm_kpi_sample_type, + }) diff --git a/src/device/service/driver_api/DriverFactory.py b/src/device/service/driver_api/DriverFactory.py index 1e79b4ba45593d3f24f7193648010071d766ec58..b2b6c467a9d7c941a430e7bc7aaa1ab123053750 100644 --- a/src/device/service/driver_api/DriverFactory.py +++ b/src/device/service/driver_api/DriverFactory.py @@ -76,6 +76,7 @@ class DriverFactory: field_candidate_driver_classes = field_candidate_driver_classes.union(field_indice_drivers) if candidate_driver_classes is None: + if len(field_candidate_driver_classes) == 0: continue candidate_driver_classes = {k:1 for k in field_candidate_driver_classes} else: for candidate_driver_class in candidate_driver_classes: diff --git a/src/device/service/drivers/__init__.py b/src/device/service/drivers/__init__.py index 664b52821224f4d53b17cb5e10e44461ac7b75f4..40912f50b98f1d5fc9555d87a4855a12ab8e0c07 100644 --- a/src/device/service/drivers/__init__.py +++ b/src/device/service/drivers/__init__.py @@ -23,6 +23,11 @@ from .microwave.IETFApiDriver import IETFApiDriver DRIVERS = [ (EmulatedDriver, [ { + # Driver==unspecified & no device type specified => use Emulated + FilterFieldEnum.DRIVER: ORM_DeviceDriverEnum.UNDEFINED, + }, + { + # Emulated OLS/Packet Router, specifying Undefined/OpenConfig/TAPI Driver => use EmulatedDriver FilterFieldEnum.DEVICE_TYPE: [ DeviceTypeEnum.EMULATED_OPTICAL_LINE_SYSTEM, DeviceTypeEnum.EMULATED_PACKET_ROUTER, @@ -36,18 +41,21 @@ DRIVERS = [ ]), (OpenConfigDriver, [ { + # Real Packet Router, specifying OpenConfig Driver => use OpenConfigDriver FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.PACKET_ROUTER, FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.OPENCONFIG, } ]), (TransportApiDriver, [ { + # Real OLS, specifying TAPI Driver => use TransportApiDriver FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.OPTICAL_LINE_SYSTEM, FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.TRANSPORT_API, } ]), (P4Driver, [ { + # Real P4 Switch, specifying P4 Driver => use P4Driver FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.P4_SWITCH, FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.P4, } diff --git a/src/device/tests/MockMonitoringService.py b/src/device/tests/MockMonitoringService.py deleted file mode 100644 index 3e8550058daa905517f26a659a08c66db1172d74..0000000000000000000000000000000000000000 --- a/src/device/tests/MockMonitoringService.py +++ /dev/null @@ -1,61 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import grpc, logging -from concurrent import futures -from queue import Queue -from monitoring.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD -from monitoring.proto.monitoring_pb2_grpc import add_MonitoringServiceServicer_to_server -from .MockMonitoringServiceServicerImpl import MockMonitoringServiceServicerImpl - -BIND_ADDRESS = '0.0.0.0' -LOGGER = logging.getLogger(__name__) - -class MockMonitoringService: - def __init__( - self, address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, - grace_period=GRPC_GRACE_PERIOD): - - self.queue_samples = Queue() - self.address = address - self.port = port - self.endpoint = None - self.max_workers = max_workers - self.grace_period = grace_period - self.monitoring_servicer = None - self.pool = None - self.server = None - - def start(self): - self.endpoint = '{:s}:{:s}'.format(str(self.address), str(self.port)) - LOGGER.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format( - str(self.endpoint), str(self.max_workers))) - - self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers) - self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,)) - - self.monitoring_servicer = MockMonitoringServiceServicerImpl(self.queue_samples) - add_MonitoringServiceServicer_to_server(self.monitoring_servicer, self.server) - - port = self.server.add_insecure_port(self.endpoint) - self.endpoint = '{:s}:{:s}'.format(str(self.address), str(port)) - LOGGER.info('Listening on {:s}...'.format(str(self.endpoint))) - self.server.start() - - LOGGER.debug('Service started') - - def stop(self): - LOGGER.debug('Stopping service (grace period {:s} seconds)...'.format(str(self.grace_period))) - self.server.stop(self.grace_period) - LOGGER.debug('Service stopped') diff --git a/src/device/tests/MockService_Dependencies.py b/src/device/tests/MockService_Dependencies.py new file mode 100644 index 0000000000000000000000000000000000000000..6b2a7788fa3fa242edb9cc7c4b10e22244d7c99a --- /dev/null +++ b/src/device/tests/MockService_Dependencies.py @@ -0,0 +1,50 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os, queue +from typing import Union +from common.Constants import ServiceNameEnum +from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name +from common.tests.MockServicerImpl_Context import MockServicerImpl_Context +from common.tests.MockServicerImpl_Monitoring import MockServicerImpl_Monitoring +from common.tools.service.GenericGrpcService import GenericGrpcService +from context.proto.context_pb2_grpc import add_ContextServiceServicer_to_server +from monitoring.proto.monitoring_pb2_grpc import add_MonitoringServiceServicer_to_server + +LOCAL_HOST = '127.0.0.1' + +SERVICE_CONTEXT = ServiceNameEnum.CONTEXT +SERVICE_MONITORING = ServiceNameEnum.MONITORING + +class MockService_Dependencies(GenericGrpcService): + # Mock Service implementing Context and Monitoring to simplify unitary tests of Device + + def __init__(self, bind_port: Union[str, int]) -> None: + super().__init__(bind_port, LOCAL_HOST, enable_health_servicer=False, cls_name='MockService') + + # pylint: disable=attribute-defined-outside-init + def install_servicers(self): + self.context_servicer = MockServicerImpl_Context() + add_ContextServiceServicer_to_server(self.context_servicer, self.server) + + self.queue_samples = queue.Queue() + self.monitoring_servicer = MockServicerImpl_Monitoring(queue_samples=self.queue_samples) + add_MonitoringServiceServicer_to_server(self.monitoring_servicer, self.server) + + def configure_env_vars(self): + os.environ[get_env_var_name(SERVICE_CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(self.bind_address) + os.environ[get_env_var_name(SERVICE_CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(self.bind_port) + + os.environ[get_env_var_name(SERVICE_MONITORING, ENVVAR_SUFIX_SERVICE_HOST )] = str(self.bind_address) + os.environ[get_env_var_name(SERVICE_MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(self.bind_port) diff --git a/src/device/tests/PrepareTestScenario.py b/src/device/tests/PrepareTestScenario.py new file mode 100644 index 0000000000000000000000000000000000000000..08991221a3f5121c587ecfd4644a6b28156ccefd --- /dev/null +++ b/src/device/tests/PrepareTestScenario.py @@ -0,0 +1,80 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest, os +from common.Constants import ServiceNameEnum +from common.Settings import ( + ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc) +from context.client.ContextClient import ContextClient +from context.proto.context_pb2 import Context, Topology +from device.client.DeviceClient import DeviceClient +from device.service.DeviceService import DeviceService +from device.service.driver_api.DriverFactory import DriverFactory +from device.service.driver_api.DriverInstanceCache import DriverInstanceCache +from device.service.drivers import DRIVERS +from device.tests.CommonObjects import CONTEXT, TOPOLOGY +from device.tests.MockService_Dependencies import MockService_Dependencies +from monitoring.client.MonitoringClient import MonitoringClient + +LOCAL_HOST = '127.0.0.1' +MOCKSERVICE_PORT = 10000 +DEVICE_SERVICE_PORT = MOCKSERVICE_PORT + get_service_port_grpc(ServiceNameEnum.DEVICE) # avoid privileged ports +os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) +os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(DEVICE_SERVICE_PORT) + +@pytest.fixture(scope='session') +def mock_service(): + _service = MockService_Dependencies(MOCKSERVICE_PORT) + _service.configure_env_vars() + _service.start() + yield _service + _service.stop() + +@pytest.fixture(scope='session') +def context_client(mock_service : MockService_Dependencies): # pylint: disable=redefined-outer-name + _client = ContextClient() + yield _client + _client.close() + +@pytest.fixture(scope='session') +def monitoring_client(mock_service : MockService_Dependencies): # pylint: disable=redefined-outer-name + _client = MonitoringClient() + yield _client + _client.close() + +@pytest.fixture(scope='session') +def device_service( + context_client : ContextClient, # pylint: disable=redefined-outer-name + monitoring_client : MonitoringClient): # pylint: disable=redefined-outer-name + + _driver_factory = DriverFactory(DRIVERS) + _driver_instance_cache = DriverInstanceCache(_driver_factory) + _service = DeviceService(_driver_instance_cache) + _service.start() + yield _service + _service.stop() + +@pytest.fixture(scope='session') +def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name + _client = DeviceClient() + yield _client + _client.close() + +def test_prepare_environment( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + device_service : DeviceService): # pylint: disable=redefined-outer-name + + context_client.SetContext(Context(**CONTEXT)) + context_client.SetTopology(Topology(**TOPOLOGY)) diff --git a/src/device/tests/test_unitary.py b/src/device/tests/test_unitary.py deleted file mode 100644 index 0853da9a5e3572b15e5581413d1a5c765e02444e..0000000000000000000000000000000000000000 --- a/src/device/tests/test_unitary.py +++ /dev/null @@ -1,1010 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import calendar, copy, dateutil.parser, grpc, json, logging, operator, os, pytest, queue, time -from datetime import datetime, timezone -from typing import Tuple -from common.orm.Database import Database -from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum -from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum -from common.message_broker.MessageBroker import MessageBroker -from common.tools.grpc.Tools import grpc_message_to_json_string -from common.tools.object_factory.EndPoint import json_endpoint, json_endpoint_id -from context.Config import ( - GRPC_SERVICE_PORT as CONTEXT_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as CONTEXT_GRPC_MAX_WORKERS, - GRPC_GRACE_PERIOD as CONTEXT_GRPC_GRACE_PERIOD) -from context.client.ContextClient import ContextClient -from context.proto.context_pb2 import DeviceId, DeviceOperationalStatusEnum -from context.service.grpc_server.ContextService import ContextService -from device.Config import ( - GRPC_SERVICE_PORT as DEVICE_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as DEVICE_GRPC_MAX_WORKERS, - GRPC_GRACE_PERIOD as DEVICE_GRPC_GRACE_PERIOD) -from device.client.DeviceClient import DeviceClient -from device.proto.context_pb2 import ConfigActionEnum, Context, Device, Topology -from device.proto.device_pb2 import MonitoringSettings -from device.proto.kpi_sample_types_pb2 import KpiSampleType -from device.service.DeviceService import DeviceService -from device.service.driver_api._Driver import _Driver -from device.service.driver_api.DriverFactory import DriverFactory -from device.service.driver_api.DriverInstanceCache import DriverInstanceCache -from device.service.drivers import DRIVERS -from device.tests.MockMonitoringService import MockMonitoringService -from monitoring.Config import ( - GRPC_SERVICE_PORT as MONITORING_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as MONITORING_GRPC_MAX_WORKERS, - GRPC_GRACE_PERIOD as MONITORING_GRPC_GRACE_PERIOD) -from monitoring.client.monitoring_client import MonitoringClient -from .CommonObjects import CONTEXT, TOPOLOGY - -from .Device_Emulated import ( - DEVICE_EMU, DEVICE_EMU_CONFIG_ADDRESSES, DEVICE_EMU_CONFIG_ENDPOINTS, DEVICE_EMU_CONNECT_RULES, - DEVICE_EMU_DECONFIG_ADDRESSES, DEVICE_EMU_DECONFIG_ENDPOINTS, DEVICE_EMU_EP_DESCS, DEVICE_EMU_ENDPOINTS_COOKED, - DEVICE_EMU_ID, DEVICE_EMU_RECONFIG_ADDRESSES, DEVICE_EMU_UUID) -ENABLE_EMULATED = True - -try: - from .Device_OpenConfig_Infinera1 import( - #from .Device_OpenConfig_Infinera2 import( - #from .Device_OpenConfig_Cisco import( - #from .Device_OpenConfig_Adva import( - - DEVICE_OC, DEVICE_OC_CONFIG_RULES, DEVICE_OC_DECONFIG_RULES, DEVICE_OC_CONNECT_RULES, DEVICE_OC_ID, - DEVICE_OC_UUID) - ENABLE_OPENCONFIG = True -except ImportError: - ENABLE_OPENCONFIG = False - -try: - from .Device_Transport_Api_CTTC import ( - DEVICE_TAPI, DEVICE_TAPI_CONNECT_RULES, DEVICE_TAPI_UUID, DEVICE_TAPI_ID, DEVICE_TAPI_CONFIG_RULES, - DEVICE_TAPI_DECONFIG_RULES) - ENABLE_TAPI = True -except ImportError: - ENABLE_TAPI = False - -from .mock_p4runtime_service import MockP4RuntimeService -try: - from .device_p4 import( - DEVICE_P4, DEVICE_P4_ID, DEVICE_P4_UUID, DEVICE_P4_NAME, DEVICE_P4_ADDRESS, DEVICE_P4_PORT, DEVICE_P4_WORKERS, - DEVICE_P4_GRACE_PERIOD, DEVICE_P4_CONNECT_RULES, DEVICE_P4_CONFIG_RULES) - ENABLE_P4 = True -except ImportError: - ENABLE_P4 = False - -#ENABLE_EMULATED = False # set to False to disable tests of Emulated devices -#ENABLE_OPENCONFIG = False # set to False to disable tests of OpenConfig devices -#ENABLE_TAPI = False # set to False to disable tests of TAPI devices -#ENABLE_P4 = False # set to False to disable tests of P4 devices - -ENABLE_OPENCONFIG_CONFIGURE = True -ENABLE_OPENCONFIG_MONITOR = True -ENABLE_OPENCONFIG_DECONFIGURE = True - - -logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING) -logging.getLogger('apscheduler.scheduler').setLevel(logging.WARNING) -logging.getLogger('monitoring-client').setLevel(logging.WARNING) - -LOGGER = logging.getLogger(__name__) -LOGGER.setLevel(logging.DEBUG) - -CONTEXT_GRPC_SERVICE_PORT = 10000 + CONTEXT_GRPC_SERVICE_PORT # avoid privileged ports -DEVICE_GRPC_SERVICE_PORT = 10000 + DEVICE_GRPC_SERVICE_PORT # avoid privileged ports -MONITORING_GRPC_SERVICE_PORT = 10000 + MONITORING_GRPC_SERVICE_PORT # avoid privileged ports - -DEFAULT_REDIS_SERVICE_HOST = '127.0.0.1' -DEFAULT_REDIS_SERVICE_PORT = 6379 -DEFAULT_REDIS_DATABASE_ID = 0 - -REDIS_CONFIG = { - 'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST), - 'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT), - 'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID', DEFAULT_REDIS_DATABASE_ID ), -} - -SCENARIOS = [ - ('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ), - #('all_redis', DatabaseBackendEnum.REDIS, REDIS_CONFIG, MessageBrokerBackendEnum.REDIS, REDIS_CONFIG), -] - -@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS) -def context_db_mb(request) -> Tuple[Database, MessageBroker]: - name,db_backend,db_settings,mb_backend,mb_settings = request.param - msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...' - LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings))) - _database = Database(get_database_backend(backend=db_backend, **db_settings)) - _message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings)) - yield _database, _message_broker - _message_broker.terminate() - -@pytest.fixture(scope='session') -def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name - _service = ContextService( - context_db_mb[0], context_db_mb[1], port=CONTEXT_GRPC_SERVICE_PORT, max_workers=CONTEXT_GRPC_MAX_WORKERS, - grace_period=CONTEXT_GRPC_GRACE_PERIOD) - _service.start() - yield _service - _service.stop() - -@pytest.fixture(scope='session') -def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name - _client = ContextClient(address='127.0.0.1', port=CONTEXT_GRPC_SERVICE_PORT) - yield _client - _client.close() - -@pytest.fixture(scope='session') -def monitoring_service(): - _service = MockMonitoringService(port=MONITORING_GRPC_SERVICE_PORT, max_workers=MONITORING_GRPC_MAX_WORKERS, - grace_period=MONITORING_GRPC_GRACE_PERIOD) - _service.start() - yield _service - _service.stop() - -@pytest.fixture(scope='session') -def monitoring_client(monitoring_service : MockMonitoringService): # pylint: disable=redefined-outer-name - _client = MonitoringClient(server='127.0.0.1', port=MONITORING_GRPC_SERVICE_PORT) - #yield _client - #_client.close() - return _client - -@pytest.fixture(scope='session') -def device_service( - context_client : ContextClient, # pylint: disable=redefined-outer-name - monitoring_client : MonitoringClient): # pylint: disable=redefined-outer-name - - _driver_factory = DriverFactory(DRIVERS) - _driver_instance_cache = DriverInstanceCache(_driver_factory) - _service = DeviceService( - context_client, monitoring_client, _driver_instance_cache, port=DEVICE_GRPC_SERVICE_PORT, - max_workers=DEVICE_GRPC_MAX_WORKERS, grace_period=DEVICE_GRPC_GRACE_PERIOD) - _service.start() - yield _service - _service.stop() - -@pytest.fixture(scope='session') -def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name - _client = DeviceClient(address='127.0.0.1', port=DEVICE_GRPC_SERVICE_PORT) - yield _client - _client.close() - -@pytest.fixture(scope='session') -def p4runtime_service(): - _service = MockP4RuntimeService( - address=DEVICE_P4_ADDRESS, port=DEVICE_P4_PORT, - max_workers=DEVICE_P4_WORKERS, - grace_period=DEVICE_P4_GRACE_PERIOD) - _service.start() - yield _service - _service.stop() - - -def test_prepare_environment( - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient, # pylint: disable=redefined-outer-name - device_service : DeviceService): # pylint: disable=redefined-outer-name - - context_client.SetContext(Context(**CONTEXT)) - context_client.SetTopology(Topology(**TOPOLOGY)) - - -# ----- Test Device Driver Emulated -------------------------------------------- -# Device Driver Emulated tests are used to validate Driver API as well as Emulated Device Driver. Note that other -# Drivers might support a different set of resource paths, and attributes/values per resource; however, they must -# implement the Driver API. - -def test_device_emulated_add_error_cases( - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient, # pylint: disable=redefined-outer-name - device_service : DeviceService): # pylint: disable=redefined-outer-name - - if not ENABLE_EMULATED: pytest.skip('Skipping test: No Emulated device has been configured') - - with pytest.raises(grpc.RpcError) as e: - DEVICE_EMU_WITH_ENDPOINTS = copy.deepcopy(DEVICE_EMU) - DEVICE_EMU_WITH_ENDPOINTS['device_endpoints'].append(json_endpoint(DEVICE_EMU_ID, 'ep-id', 'ep-type')) - device_client.AddDevice(Device(**DEVICE_EMU_WITH_ENDPOINTS)) - assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT - msg_head = 'device.device_endpoints([' - msg_tail = ']) is invalid; RPC method AddDevice does not accept Endpoints. '\ - 'Endpoints are discovered through interrogation of the physical device.' - except_msg = str(e.value.details()) - assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) - - with pytest.raises(grpc.RpcError) as e: - DEVICE_EMU_WITH_EXTRA_RULES = copy.deepcopy(DEVICE_EMU) - DEVICE_EMU_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONNECT_RULES) - DEVICE_EMU_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONFIG_ENDPOINTS) - device_client.AddDevice(Device(**DEVICE_EMU_WITH_EXTRA_RULES)) - assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT - msg_head = 'device.device_config.config_rules([' - msg_tail = ']) is invalid; RPC method AddDevice only accepts connection Config Rules that should start '\ - 'with "_connect/" tag. Others should be configured after adding the device.' - except_msg = str(e.value.details()) - assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) - - -def test_device_emulated_add_correct( - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient, # pylint: disable=redefined-outer-name - device_service : DeviceService): # pylint: disable=redefined-outer-name - - if not ENABLE_EMULATED: pytest.skip('Skipping test: No Emulated device has been configured') - - DEVICE_EMU_WITH_CONNECT_RULES = copy.deepcopy(DEVICE_EMU) - DEVICE_EMU_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONNECT_RULES) - device_client.AddDevice(Device(**DEVICE_EMU_WITH_CONNECT_RULES)) - driver : _Driver = device_service.driver_instance_cache.get(DEVICE_EMU_UUID) # we know the driver exists now - assert driver is not None - - -def test_device_emulated_get( - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient, # pylint: disable=redefined-outer-name - device_service : DeviceService): # pylint: disable=redefined-outer-name - - if not ENABLE_EMULATED: pytest.skip('Skipping test: No Emulated device has been configured') - - initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE_EMU_ID)) - LOGGER.info('initial_config = {:s}'.format(grpc_message_to_json_string(initial_config))) - - device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID)) - LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data))) - - -def test_device_emulated_configure( - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient, # pylint: disable=redefined-outer-name - device_service : DeviceService): # pylint: disable=redefined-outer-name - - if not ENABLE_EMULATED: pytest.skip('Skipping test: No Emulated device has been configured') - - driver : _Driver = device_service.driver_instance_cache.get(DEVICE_EMU_UUID) # we know the driver exists now - assert driver is not None - - driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) - #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) - assert len(driver_config) == len(DEVICE_EMU_ENDPOINTS_COOKED) - for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED: - assert endpoint_cooked in driver_config - - DEVICE_EMU_WITH_CONFIG_RULES = copy.deepcopy(DEVICE_EMU) - DEVICE_EMU_WITH_CONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONFIG_ENDPOINTS) - device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_CONFIG_RULES)) - - DEVICE_EMU_WITH_CONFIG_RULES = copy.deepcopy(DEVICE_EMU) - DEVICE_EMU_WITH_CONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONFIG_ADDRESSES) - device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_CONFIG_RULES)) - - DEVICE_EMU_WITH_OPERATIONAL_STATUS = copy.deepcopy(DEVICE_EMU) - DEVICE_EMU_WITH_OPERATIONAL_STATUS['device_operational_status'] = \ - DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED - device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_OPERATIONAL_STATUS)) - - driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) - #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) - assert len(driver_config) == len(DEVICE_EMU_ENDPOINTS_COOKED) + len(DEVICE_EMU_CONFIG_ADDRESSES) - for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED: - endpoint_cooked = copy.deepcopy(endpoint_cooked) - endpoint_cooked[1]['enabled'] = True - assert endpoint_cooked in driver_config - for config_rule in DEVICE_EMU_CONFIG_ADDRESSES: - assert (config_rule['resource_key'], json.loads(config_rule['resource_value'])) in driver_config - - device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID)) - assert device_data.device_operational_status == DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED - - config_rules = [ - (ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value) - for config_rule in device_data.device_config.config_rules - ] - #LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( - # '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules]))) - RESULTING_CONFIG_ENDPOINTS = {cr['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ENDPOINTS)} - for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED: - values = json.loads(RESULTING_CONFIG_ENDPOINTS[endpoint_cooked[0]]['resource_value']) - values.update(endpoint_cooked[1]) - RESULTING_CONFIG_ENDPOINTS[endpoint_cooked[0]]['resource_value'] = json.dumps(values, sort_keys=True) - for config_rule in RESULTING_CONFIG_ENDPOINTS.values(): - config_rule = ( - ConfigActionEnum.Name(config_rule['action']), config_rule['resource_key'], - json.loads(json.dumps(config_rule['resource_value']))) - assert config_rule in config_rules - for config_rule in DEVICE_EMU_CONFIG_ADDRESSES: - config_rule = ( - ConfigActionEnum.Name(config_rule['action']), config_rule['resource_key'], - json.loads(json.dumps(config_rule['resource_value']))) - assert config_rule in config_rules - - # Try to reconfigure... - - DEVICE_EMU_WITH_RECONFIG_RULES = copy.deepcopy(DEVICE_EMU) - DEVICE_EMU_WITH_RECONFIG_RULES['device_operational_status'] = \ - DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED - DEVICE_EMU_WITH_RECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_RECONFIG_ADDRESSES) - device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_RECONFIG_RULES)) - - RESULTING_CONFIG_RULES = {cr['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ENDPOINTS)} - for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED: - values = json.loads(RESULTING_CONFIG_RULES[endpoint_cooked[0]]['resource_value']) - values.update(endpoint_cooked[1]) - RESULTING_CONFIG_RULES[endpoint_cooked[0]]['resource_value'] = json.dumps(values, sort_keys=True) - RESULTING_CONFIG_RULES.update({cr['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ADDRESSES)}) - for reconfig_rule in DEVICE_EMU_RECONFIG_ADDRESSES: - if reconfig_rule['action'] == ConfigActionEnum.CONFIGACTION_DELETE: - RESULTING_CONFIG_RULES.pop(reconfig_rule['resource_key'], None) - else: - RESULTING_CONFIG_RULES[reconfig_rule['resource_key']] = reconfig_rule - RESULTING_CONFIG_RULES = RESULTING_CONFIG_RULES.values() - #LOGGER.info('RESULTING_CONFIG_RULES = {:s}'.format(str(RESULTING_CONFIG_RULES))) - - driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) - driver_config = json.loads(json.dumps(driver_config)) # prevent integer keys to fail matching with string keys - #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) - assert len(driver_config) == len(RESULTING_CONFIG_RULES) - for config_rule in RESULTING_CONFIG_RULES: - resource = [config_rule['resource_key'], json.loads(config_rule['resource_value'])] - assert resource in driver_config - - device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID)) - config_rules = [ - (ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value) - for config_rule in device_data.device_config.config_rules - ] - #LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( - # '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules]))) - for config_rule in RESULTING_CONFIG_RULES: - config_rule = ( - ConfigActionEnum.Name(config_rule['action']), config_rule['resource_key'], config_rule['resource_value']) - assert config_rule in config_rules - - -def test_device_emulated_monitor( - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient, # pylint: disable=redefined-outer-name - device_service : DeviceService, # pylint: disable=redefined-outer-name - monitoring_service : MockMonitoringService): # pylint: disable=redefined-outer-name - - if not ENABLE_EMULATED: pytest.skip('Skipping test: No Emulated device has been configured') - - device_uuid = DEVICE_EMU_UUID - json_device_id = DEVICE_EMU_ID - device_id = DeviceId(**json_device_id) - device_data = context_client.GetDevice(device_id) - #LOGGER.info('device_data = \n{:s}'.format(str(device_data))) - - driver : _Driver = device_service.driver_instance_cache.get(device_uuid) # we know the driver exists now - assert driver is not None - #driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) - #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) - #assert len(driver_config) == len(DEVICE_EMU_ENDPOINTS_COOKED) + len(DEVICE_EMU_CONFIG_ADDRESSES) - - SAMPLING_DURATION_SEC = 10.0 - SAMPLING_INTERVAL_SEC = 2.0 - - MONITORING_SETTINGS_LIST = [] - KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED = {} - for endpoint in device_data.device_endpoints: - endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid - for sample_type_id in endpoint.kpi_sample_types: - sample_type_name = str(KpiSampleType.Name(sample_type_id)).upper().replace('KPISAMPLETYPE_', '') - kpi_uuid = '{:s}-{:s}-{:s}-kpi_uuid'.format(device_uuid, endpoint_uuid, str(sample_type_id)) - monitoring_settings = { - 'kpi_id' : {'kpi_id': {'uuid': kpi_uuid}}, - 'kpi_descriptor': { - 'kpi_description': 'Metric {:s} for Endpoint {:s} in Device {:s}'.format( - sample_type_name, endpoint_uuid, device_uuid), - 'kpi_sample_type': sample_type_id, - 'device_id': json_device_id, - 'endpoint_id': json_endpoint_id(json_device_id, endpoint_uuid), - }, - 'sampling_duration_s': SAMPLING_DURATION_SEC, - 'sampling_interval_s': SAMPLING_INTERVAL_SEC, - } - MONITORING_SETTINGS_LIST.append(monitoring_settings) - KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED[kpi_uuid] = 0 - - NUM_SAMPLES_EXPECTED_PER_KPI = SAMPLING_DURATION_SEC / SAMPLING_INTERVAL_SEC - NUM_SAMPLES_EXPECTED = len(MONITORING_SETTINGS_LIST) * NUM_SAMPLES_EXPECTED_PER_KPI - - # Start monitoring the device - t_start_monitoring = datetime.timestamp(datetime.utcnow()) - for monitoring_settings in MONITORING_SETTINGS_LIST: - device_client.MonitorDeviceKpi(MonitoringSettings(**monitoring_settings)) - - # wait to receive the expected number of samples - # if takes more than 1.5 times the sampling duration, assume there is an error - time_ini = time.time() - queue_samples : queue.Queue = monitoring_service.queue_samples - received_samples = [] - while (len(received_samples) < NUM_SAMPLES_EXPECTED) and (time.time() - time_ini < SAMPLING_DURATION_SEC * 1.5): - try: - received_sample = queue_samples.get(block=True, timeout=SAMPLING_INTERVAL_SEC / NUM_SAMPLES_EXPECTED) - #LOGGER.info('received_sample = {:s}'.format(str(received_sample))) - received_samples.append(received_sample) - except queue.Empty: - continue - - t_end_monitoring = datetime.timestamp(datetime.utcnow()) - - #LOGGER.info('received_samples = {:s}'.format(str(received_samples))) - LOGGER.info('len(received_samples) = {:s}'.format(str(len(received_samples)))) - LOGGER.info('NUM_SAMPLES_EXPECTED = {:s}'.format(str(NUM_SAMPLES_EXPECTED))) - assert len(received_samples) == NUM_SAMPLES_EXPECTED - for received_sample in received_samples: - kpi_uuid = received_sample.kpi_id.kpi_id.uuid - assert kpi_uuid in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED - assert isinstance(received_sample.timestamp, str) - try: - timestamp = float(received_sample.timestamp) - except ValueError: - dt_time = dateutil.parser.isoparse(received_sample.timestamp).replace(tzinfo=timezone.utc) - timestamp = float(calendar.timegm(dt_time.timetuple())) + (dt_time.microsecond / 1.e6) - assert timestamp > t_start_monitoring - assert timestamp < t_end_monitoring - assert received_sample.kpi_value.HasField('floatVal') or received_sample.kpi_value.HasField('intVal') - kpi_value = getattr(received_sample.kpi_value, received_sample.kpi_value.WhichOneof('value')) - assert isinstance(kpi_value, (float, int)) - KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED[kpi_uuid] += 1 - - LOGGER.info('KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED = {:s}'.format(str(KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED))) - for kpi_uuid, num_samples_received in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED.items(): - assert num_samples_received == NUM_SAMPLES_EXPECTED_PER_KPI - - # Unsubscribe monitoring - for kpi_uuid in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED.keys(): - MONITORING_SETTINGS_UNSUBSCRIBE = { - 'kpi_id' : {'kpi_id': {'uuid': kpi_uuid}}, - 'sampling_duration_s': -1, # negative value in sampling_duration_s or sampling_interval_s means unsibscribe - 'sampling_interval_s': -1, # kpi_id is mandatory to unsibscribe - } - device_client.MonitorDeviceKpi(MonitoringSettings(**MONITORING_SETTINGS_UNSUBSCRIBE)) - - -def test_device_emulated_deconfigure( - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient, # pylint: disable=redefined-outer-name - device_service : DeviceService): # pylint: disable=redefined-outer-name - - if not ENABLE_EMULATED: pytest.skip('Skipping test: No Emulated device has been configured') - - driver : _Driver = device_service.driver_instance_cache.get(DEVICE_EMU_UUID) # we know the driver exists now - assert driver is not None - - driver_config = driver.GetConfig() - #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) - - DEVICE_EMU_WITH_DECONFIG_RULES = copy.deepcopy(DEVICE_EMU) - DEVICE_EMU_WITH_DECONFIG_RULES['device_operational_status'] = \ - DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED - DEVICE_EMU_WITH_DECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_DECONFIG_ADDRESSES) - device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_DECONFIG_RULES)) - - RESULTING_CONFIG_RULES = {cr['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ENDPOINTS)} - for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED: - values = json.loads(RESULTING_CONFIG_RULES[endpoint_cooked[0]]['resource_value']) - values.update(endpoint_cooked[1]) - RESULTING_CONFIG_RULES[endpoint_cooked[0]]['resource_value'] = json.dumps(values, sort_keys=True) - RESULTING_CONFIG_RULES = RESULTING_CONFIG_RULES.values() - driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) - driver_config = json.loads(json.dumps(driver_config)) # prevent integer keys to fail matching with string keys - driver_config = list(filter( - lambda config_rule: ( - not isinstance(config_rule[1], str) or not config_rule[1].startswith('do_sampling (trigger:')), - driver_config)) - LOGGER.info('driver_config = {:s}'.format(str(driver_config))) - LOGGER.info('RESULTING_CONFIG_RULES = {:s}'.format(str(RESULTING_CONFIG_RULES))) - assert len(driver_config) == len(RESULTING_CONFIG_RULES) - for config_rule in RESULTING_CONFIG_RULES: - config_rule = [config_rule['resource_key'], json.loads(config_rule['resource_value'])] - #LOGGER.info('config_rule = {:s}'.format(str(config_rule))) - assert config_rule in driver_config - - DEVICE_EMU_WITH_DECONFIG_RULES = copy.deepcopy(DEVICE_EMU) - DEVICE_EMU_WITH_DECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_DECONFIG_ENDPOINTS) - device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_DECONFIG_RULES)) - - driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) - driver_config = json.loads(json.dumps(driver_config)) # prevent integer keys to fail matching with string keys - #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) - assert len(driver_config) == 0 - - device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID)) - config_rules = device_data.device_config.config_rules - LOGGER.info('config_rules = {:s}'.format(str(config_rules))) - clean_config_rules = [] - for config_rule in config_rules: - config_rule_value = json.loads(config_rule.resource_value) - if not isinstance(config_rule_value, str): clean_config_rules.append(config_rule) - if config_rule_value.startswith('do_sampling (trigger:'): continue - clean_config_rules.append(config_rule) - LOGGER.info('clean_config_rules = {:s}'.format(str(clean_config_rules))) - assert len(clean_config_rules) == 0 - - -def test_device_emulated_delete( - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient, # pylint: disable=redefined-outer-name - device_service : DeviceService): # pylint: disable=redefined-outer-name - - if not ENABLE_EMULATED: pytest.skip('Skipping test: No Emulated device has been configured') - - device_client.DeleteDevice(DeviceId(**DEVICE_EMU_ID)) - driver : _Driver = device_service.driver_instance_cache.get(DEVICE_EMU_UUID, {}) - assert driver is None - - -# ----- Test Device Driver OpenConfig ------------------------------------------ - -def test_device_openconfig_add_error_cases( - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient, # pylint: disable=redefined-outer-name - device_service : DeviceService): # pylint: disable=redefined-outer-name - - if not ENABLE_OPENCONFIG: pytest.skip('Skipping test: No OpenConfig device has been configured') - - with pytest.raises(grpc.RpcError) as e: - DEVICE_OC_WITH_EXTRA_RULES = copy.deepcopy(DEVICE_OC) - DEVICE_OC_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_OC_CONNECT_RULES) - DEVICE_OC_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_OC_CONFIG_RULES) - device_client.AddDevice(Device(**DEVICE_OC_WITH_EXTRA_RULES)) - assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT - msg_head = 'device.device_config.config_rules([' - msg_tail = ']) is invalid; RPC method AddDevice only accepts connection Config Rules that should start '\ - 'with "_connect/" tag. Others should be configured after adding the device.' - except_msg = str(e.value.details()) - assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) - - -def test_device_openconfig_add_correct( - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient, # pylint: disable=redefined-outer-name - device_service : DeviceService): # pylint: disable=redefined-outer-name - - if not ENABLE_OPENCONFIG: pytest.skip('Skipping test: No OpenConfig device has been configured') - - DEVICE_OC_WITH_CONNECT_RULES = copy.deepcopy(DEVICE_OC) - DEVICE_OC_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_OC_CONNECT_RULES) - device_client.AddDevice(Device(**DEVICE_OC_WITH_CONNECT_RULES)) - driver : _Driver = device_service.driver_instance_cache.get(DEVICE_OC_UUID) # we know the driver exists now - assert driver is not None - - device_data = context_client.GetDevice(DeviceId(**DEVICE_OC_ID)) - config_rules = [ - (ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value) - for config_rule in device_data.device_config.config_rules - ] - LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( - '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules]))) - - -def test_device_openconfig_get( - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient, # pylint: disable=redefined-outer-name - device_service : DeviceService): # pylint: disable=redefined-outer-name - - if not ENABLE_OPENCONFIG: pytest.skip('Skipping test: No OpenConfig device has been configured') - - initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE_OC_ID)) - LOGGER.info('initial_config = {:s}'.format(grpc_message_to_json_string(initial_config))) - - device_data = context_client.GetDevice(DeviceId(**DEVICE_OC_ID)) - LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data))) - - -def test_device_openconfig_configure( - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient, # pylint: disable=redefined-outer-name - device_service : DeviceService): # pylint: disable=redefined-outer-name - - if not ENABLE_OPENCONFIG: pytest.skip('Skipping test: No OpenConfig device has been configured') - if not ENABLE_OPENCONFIG_CONFIGURE: pytest.skip('Skipping test OpenConfig configure') - - driver : _Driver = device_service.driver_instance_cache.get(DEVICE_OC_UUID) # we know the driver exists now - assert driver is not None - - # Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly. - #driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) - #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) - - DEVICE_OC_WITH_CONFIG_RULES = copy.deepcopy(DEVICE_OC) - DEVICE_OC_WITH_CONFIG_RULES['device_config']['config_rules'].extend(DEVICE_OC_CONFIG_RULES) - device_client.ConfigureDevice(Device(**DEVICE_OC_WITH_CONFIG_RULES)) - - # Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly. - #driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) - #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) - - device_data = context_client.GetDevice(DeviceId(**DEVICE_OC_ID)) - config_rules = [ - (ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value) - for config_rule in device_data.device_config.config_rules - ] - LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( - '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules]))) - for config_rule in DEVICE_OC_CONFIG_RULES: - config_rule = ( - ConfigActionEnum.Name(config_rule['action']), config_rule['resource_key'], config_rule['resource_value']) - assert config_rule in config_rules - - -def test_device_openconfig_monitor( - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient, # pylint: disable=redefined-outer-name - device_service : DeviceService, # pylint: disable=redefined-outer-name - monitoring_service : MockMonitoringService): # pylint: disable=redefined-outer-name - - if not ENABLE_OPENCONFIG: pytest.skip('Skipping test: No OpenConfig device has been configured') - if not ENABLE_OPENCONFIG_MONITOR: pytest.skip('Skipping test OpenConfig monitor') - - device_uuid = DEVICE_OC_UUID - json_device_id = DEVICE_OC_ID - device_id = DeviceId(**json_device_id) - device_data = context_client.GetDevice(device_id) - #LOGGER.info('device_data = \n{:s}'.format(str(device_data))) - - driver : _Driver = device_service.driver_instance_cache.get(device_uuid) # we know the driver exists now - assert driver is not None - - SAMPLING_DURATION_SEC = 60.0 - SAMPLING_INTERVAL_SEC = 15.0 - - MONITORING_SETTINGS_LIST = [] - KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED = {} - for endpoint in device_data.device_endpoints: - endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid - for sample_type_id in endpoint.kpi_sample_types: - sample_type_name = str(KpiSampleType.Name(sample_type_id)).upper().replace('KPISAMPLETYPE_', '') - kpi_uuid = '{:s}-{:s}-{:s}-kpi_uuid'.format(device_uuid, endpoint_uuid, str(sample_type_id)) - monitoring_settings = { - 'kpi_id' : {'kpi_id': {'uuid': kpi_uuid}}, - 'kpi_descriptor': { - 'kpi_description': 'Metric {:s} for Endpoint {:s} in Device {:s}'.format( - sample_type_name, endpoint_uuid, device_uuid), - 'kpi_sample_type': sample_type_id, - 'device_id': json_device_id, - 'endpoint_id': json_endpoint_id(json_device_id, endpoint_uuid), - }, - 'sampling_duration_s': SAMPLING_DURATION_SEC, - 'sampling_interval_s': SAMPLING_INTERVAL_SEC, - } - MONITORING_SETTINGS_LIST.append(monitoring_settings) - KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED[kpi_uuid] = 0 - - NUM_SAMPLES_EXPECTED_PER_KPI = SAMPLING_DURATION_SEC / SAMPLING_INTERVAL_SEC - NUM_SAMPLES_EXPECTED = len(MONITORING_SETTINGS_LIST) * NUM_SAMPLES_EXPECTED_PER_KPI - - # Start monitoring the device - t_start_monitoring = datetime.timestamp(datetime.utcnow()) - for monitoring_settings in MONITORING_SETTINGS_LIST: - device_client.MonitorDeviceKpi(MonitoringSettings(**monitoring_settings)) - - # wait to receive the expected number of samples - # if takes more than 1.5 times the sampling duration, assume there is an error - time_ini = time.time() - queue_samples : queue.Queue = monitoring_service.queue_samples - received_samples = [] - while (len(received_samples) < NUM_SAMPLES_EXPECTED) and (time.time() - time_ini < SAMPLING_DURATION_SEC * 1.5): - try: - received_sample = queue_samples.get(block=True, timeout=SAMPLING_INTERVAL_SEC / NUM_SAMPLES_EXPECTED) - #LOGGER.info('received_sample = {:s}'.format(str(received_sample))) - received_samples.append(received_sample) - except queue.Empty: - continue - - t_end_monitoring = datetime.timestamp(datetime.utcnow()) - - #LOGGER.info('received_samples = {:s}'.format(str(received_samples))) - LOGGER.info('len(received_samples) = {:s}'.format(str(len(received_samples)))) - LOGGER.info('NUM_SAMPLES_EXPECTED = {:s}'.format(str(NUM_SAMPLES_EXPECTED))) - #assert len(received_samples) == NUM_SAMPLES_EXPECTED - for received_sample in received_samples: - kpi_uuid = received_sample.kpi_id.kpi_id.uuid - assert kpi_uuid in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED - assert isinstance(received_sample.timestamp, str) - try: - timestamp = float(received_sample.timestamp) - except ValueError: - dt_time = dateutil.parser.isoparse(received_sample.timestamp).replace(tzinfo=timezone.utc) - timestamp = float(calendar.timegm(dt_time.timetuple())) + (dt_time.microsecond / 1.e6) - assert timestamp > t_start_monitoring - assert timestamp < t_end_monitoring - assert received_sample.kpi_value.HasField('floatVal') or received_sample.kpi_value.HasField('intVal') - kpi_value = getattr(received_sample.kpi_value, received_sample.kpi_value.WhichOneof('value')) - assert isinstance(kpi_value, (float, int)) - KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED[kpi_uuid] += 1 - - LOGGER.info('KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED = {:s}'.format(str(KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED))) - # TODO: review why num_samples_received per KPI != NUM_SAMPLES_EXPECTED_PER_KPI - #for kpi_uuid, num_samples_received in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED.items(): - # assert num_samples_received == NUM_SAMPLES_EXPECTED_PER_KPI - - # Unsubscribe monitoring - for kpi_uuid in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED.keys(): - MONITORING_SETTINGS_UNSUBSCRIBE = { - 'kpi_id' : {'kpi_id': {'uuid': kpi_uuid}}, - 'sampling_duration_s': -1, # negative value in sampling_duration_s or sampling_interval_s means unsibscribe - 'sampling_interval_s': -1, # kpi_id is mandatory to unsibscribe - } - device_client.MonitorDeviceKpi(MonitoringSettings(**MONITORING_SETTINGS_UNSUBSCRIBE)) - - -def test_device_openconfig_deconfigure( - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient, # pylint: disable=redefined-outer-name - device_service : DeviceService): # pylint: disable=redefined-outer-name - - if not ENABLE_OPENCONFIG: pytest.skip('Skipping test: No OpenConfig device has been configured') - if not ENABLE_OPENCONFIG_DECONFIGURE: pytest.skip('Skipping test OpenConfig deconfigure') - - driver : _Driver = device_service.driver_instance_cache.get(DEVICE_OC_UUID) # we know the driver exists now - assert driver is not None - - # Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly. - #driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) - #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) - - DEVICE_OC_WITH_DECONFIG_RULES = copy.deepcopy(DEVICE_OC) - DEVICE_OC_WITH_DECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_OC_DECONFIG_RULES) - device_client.ConfigureDevice(Device(**DEVICE_OC_WITH_DECONFIG_RULES)) - - # Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly. - #driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) - #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) - - device_data = context_client.GetDevice(DeviceId(**DEVICE_OC_ID)) - config_rules = [ - (ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value) - for config_rule in device_data.device_config.config_rules - ] - LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( - '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules]))) - for config_rule in DEVICE_OC_DECONFIG_RULES: - action_set = ConfigActionEnum.Name(ConfigActionEnum.CONFIGACTION_SET) - config_rule = (action_set, config_rule['resource_key'], config_rule['resource_value']) - assert config_rule not in config_rules - - -def test_device_openconfig_delete( - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient, # pylint: disable=redefined-outer-name - device_service : DeviceService): # pylint: disable=redefined-outer-name - - if not ENABLE_OPENCONFIG: pytest.skip('Skipping test: No OpenConfig device has been configured') - - device_client.DeleteDevice(DeviceId(**DEVICE_OC_ID)) - driver : _Driver = device_service.driver_instance_cache.get(DEVICE_OC_UUID, {}) - assert driver is None - - -# ----- Test Device Driver TAPI ------------------------------------------------ - -def test_device_tapi_add_error_cases( - device_client : DeviceClient): # pylint: disable=redefined-outer-name - - if not ENABLE_TAPI: pytest.skip('Skipping test: No TAPI device has been configured') - - with pytest.raises(grpc.RpcError) as e: - DEVICE_TAPI_WITH_EXTRA_RULES = copy.deepcopy(DEVICE_TAPI) - DEVICE_TAPI_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_TAPI_CONNECT_RULES) - DEVICE_TAPI_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_TAPI_CONFIG_RULES) - device_client.AddDevice(Device(**DEVICE_TAPI_WITH_EXTRA_RULES)) - assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT - msg_head = 'device.device_config.config_rules([' - msg_tail = ']) is invalid; RPC method AddDevice only accepts connection Config Rules that should start '\ - 'with "_connect/" tag. Others should be configured after adding the device.' - except_msg = str(e.value.details()) - assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) - - -def test_device_tapi_add_correct( - device_client: DeviceClient, # pylint: disable=redefined-outer-name - device_service: DeviceService): # pylint: disable=redefined-outer-name - - if not ENABLE_TAPI: pytest.skip('Skipping test: No TAPI device has been configured') - - DEVICE_TAPI_WITH_CONNECT_RULES = copy.deepcopy(DEVICE_TAPI) - DEVICE_TAPI_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_TAPI_CONNECT_RULES) - device_client.AddDevice(Device(**DEVICE_TAPI_WITH_CONNECT_RULES)) - driver: _Driver = device_service.driver_instance_cache.get(DEVICE_TAPI_UUID) - assert driver is not None - - -def test_device_tapi_get( - context_client: ContextClient, # pylint: disable=redefined-outer-name - device_client: DeviceClient): # pylint: disable=redefined-outer-name - - if not ENABLE_TAPI: pytest.skip('Skipping test: No TAPI device has been configured') - - initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE_TAPI_ID)) - LOGGER.info('initial_config = {:s}'.format(grpc_message_to_json_string(initial_config))) - - device_data = context_client.GetDevice(DeviceId(**DEVICE_TAPI_ID)) - LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data))) - - -def test_device_tapi_configure( - context_client: ContextClient, # pylint: disable=redefined-outer-name - device_client: DeviceClient, # pylint: disable=redefined-outer-name - device_service: DeviceService): # pylint: disable=redefined-outer-name - - if not ENABLE_TAPI: pytest.skip('Skipping test: No TAPI device has been configured') - - driver : _Driver = device_service.driver_instance_cache.get(DEVICE_TAPI_UUID) - assert driver is not None - - # Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly. - #driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) - #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) - - DEVICE_TAPI_WITH_CONFIG_RULES = copy.deepcopy(DEVICE_TAPI) - DEVICE_TAPI_WITH_CONFIG_RULES['device_config']['config_rules'].extend(DEVICE_TAPI_CONFIG_RULES) - device_client.ConfigureDevice(Device(**DEVICE_TAPI_WITH_CONFIG_RULES)) - - # Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly. - #driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) - #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) - - device_data = context_client.GetDevice(DeviceId(**DEVICE_TAPI_ID)) - config_rules = [ - (ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value) - for config_rule in device_data.device_config.config_rules - ] - LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( - '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules]))) - for config_rule in DEVICE_TAPI_CONFIG_RULES: - config_rule = ( - ConfigActionEnum.Name(config_rule['action']), config_rule['resource_key'], config_rule['resource_value']) - assert config_rule in config_rules - - -def test_device_tapi_deconfigure( - context_client: ContextClient, # pylint: disable=redefined-outer-name - device_client: DeviceClient, # pylint: disable=redefined-outer-name - device_service: DeviceService): # pylint: disable=redefined-outer-name - - if not ENABLE_TAPI: pytest.skip('Skipping test: No TAPI device has been configured') - - driver: _Driver = device_service.driver_instance_cache.get(DEVICE_TAPI_UUID) - assert driver is not None - - # Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly. - #driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) - #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) - - DEVICE_TAPI_WITH_DECONFIG_RULES = copy.deepcopy(DEVICE_TAPI) - DEVICE_TAPI_WITH_DECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_TAPI_DECONFIG_RULES) - device_client.ConfigureDevice(Device(**DEVICE_TAPI_WITH_DECONFIG_RULES)) - - # Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly. - #driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) - #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) - - device_data = context_client.GetDevice(DeviceId(**DEVICE_TAPI_ID)) - config_rules = [ - (ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value) - for config_rule in device_data.device_config.config_rules - ] - LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( - '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules]))) - for config_rule in DEVICE_TAPI_DECONFIG_RULES: - action_set = ConfigActionEnum.Name(ConfigActionEnum.CONFIGACTION_SET) - config_rule = (action_set, config_rule['resource_key'], config_rule['resource_value']) - assert config_rule not in config_rules - - -def test_device_tapi_delete( - device_client : DeviceClient, # pylint: disable=redefined-outer-name - device_service : DeviceService): # pylint: disable=redefined-outer-name - - if not ENABLE_TAPI: pytest.skip('Skipping test: No TAPI device has been configured') - - device_client.DeleteDevice(DeviceId(**DEVICE_TAPI_ID)) - driver : _Driver = device_service.driver_instance_cache.get(DEVICE_TAPI_UUID, {}) - assert driver is None - - -# ----- Test Device Driver P4 -------------------------------------------------- - -def test_device_p4_add_error_cases( - context_client: ContextClient, # pylint: disable=redefined-outer-name - device_client: DeviceClient, # pylint: disable=redefined-outer-name - device_service: DeviceService): # pylint: disable=redefined-outer-name - - if not ENABLE_P4: pytest.skip( - 'Skipping test: No P4 device has been configured') - - with pytest.raises(grpc.RpcError) as e: - device_p4_with_extra_rules = copy.deepcopy(DEVICE_P4) - device_p4_with_extra_rules['device_config']['config_rules'].extend( - DEVICE_P4_CONNECT_RULES) - device_p4_with_extra_rules['device_config']['config_rules'].extend( - DEVICE_P4_CONFIG_RULES) - device_client.AddDevice(Device(**device_p4_with_extra_rules)) - assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT - msg_head = 'device.device_config.config_rules([' - msg_tail = ']) is invalid; RPC method AddDevice only accepts connection Config Rules that should start '\ - 'with "_connect/" tag. Others should be configured after adding the device.' - except_msg = str(e.value.details()) - assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) - - -def test_device_p4_add_correct( - context_client: ContextClient, # pylint: disable=redefined-outer-name - device_client: DeviceClient, # pylint: disable=redefined-outer-name - device_service: DeviceService, # pylint: disable=redefined-outer-name - p4runtime_service: MockP4RuntimeService): # pylint: disable=redefined-outer-name - - if not ENABLE_P4: pytest.skip( - 'Skipping test: No P4 device has been configured') - - device_p4_with_connect_rules = copy.deepcopy(DEVICE_P4) - device_p4_with_connect_rules['device_config']['config_rules'].extend( - DEVICE_P4_CONNECT_RULES) - device_client.AddDevice(Device(**device_p4_with_connect_rules)) - driver : _Driver = device_service.driver_instance_cache.get(DEVICE_P4_UUID) - assert driver is not None - - -def test_device_p4_get( - context_client: ContextClient, # pylint: disable=redefined-outer-name - device_client: DeviceClient, # pylint: disable=redefined-outer-name - device_service: DeviceService, # pylint: disable=redefined-outer-name - p4runtime_service: MockP4RuntimeService): # pylint: disable=redefined-outer-name - - if not ENABLE_P4: pytest.skip( - 'Skipping test: No P4 device has been configured') - - initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE_P4_ID)) - LOGGER.info('initial_config = {:s}'.format( - grpc_message_to_json_string(initial_config))) - - device_data = context_client.GetDevice(DeviceId(**DEVICE_P4_ID)) - LOGGER.info('device_data = {:s}'.format( - grpc_message_to_json_string(device_data))) - - -def test_device_p4_configure( - context_client: ContextClient, # pylint: disable=redefined-outer-name - device_client: DeviceClient, # pylint: disable=redefined-outer-name - device_service: DeviceService, # pylint: disable=redefined-outer-name - p4runtime_service: MockP4RuntimeService): # pylint: disable=redefined-outer-name - - if not ENABLE_P4: pytest.skip( - 'Skipping test: No P4 device has been configured') - - pytest.skip('Skipping test for unimplemented method') - - -def test_device_p4_deconfigure( - context_client: ContextClient, # pylint: disable=redefined-outer-name - device_client: DeviceClient, # pylint: disable=redefined-outer-name - device_service: DeviceService, # pylint: disable=redefined-outer-name - p4runtime_service: MockP4RuntimeService): # pylint: disable=redefined-outer-name - - if not ENABLE_P4: pytest.skip( - 'Skipping test: No P4 device has been configured') - - pytest.skip('Skipping test for unimplemented method') - - -def test_device_p4_delete( - context_client: ContextClient, # pylint: disable=redefined-outer-name - device_client: DeviceClient, # pylint: disable=redefined-outer-name - device_service: DeviceService, # pylint: disable=redefined-outer-name - p4runtime_service: MockP4RuntimeService): # pylint: disable=redefined-outer-name - - if not ENABLE_P4: pytest.skip('Skipping test: No P4 device has been configured') - - device_client.DeleteDevice(DeviceId(**DEVICE_P4_ID)) - driver : _Driver = device_service.driver_instance_cache.get(DEVICE_P4_UUID) - assert driver is None diff --git a/src/device/tests/test_unitary_emulated.py b/src/device/tests/test_unitary_emulated.py new file mode 100644 index 0000000000000000000000000000000000000000..67a2e9c33c11711ed3343c688f7bc5a88316eca0 --- /dev/null +++ b/src/device/tests/test_unitary_emulated.py @@ -0,0 +1,378 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import calendar, copy, dateutil.parser, grpc, json, logging, operator, pytest, queue, time +from datetime import datetime, timezone +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.tools.object_factory.EndPoint import json_endpoint, json_endpoint_id +from context.client.ContextClient import ContextClient +from context.proto.context_pb2 import DeviceId, DeviceOperationalStatusEnum +from device.client.DeviceClient import DeviceClient +from device.proto.context_pb2 import ConfigActionEnum, Device +from device.proto.device_pb2 import MonitoringSettings +from device.proto.kpi_sample_types_pb2 import KpiSampleType +from device.service.DeviceService import DeviceService +from device.service.driver_api._Driver import _Driver +from .MockService_Dependencies import MockService_Dependencies +from .PrepareTestScenario import ( # pylint: disable=unused-import + # be careful, order of symbols is important here! + mock_service, device_service, context_client, device_client, monitoring_client, test_prepare_environment) + +from .Device_Emulated import ( + DEVICE_EMU, DEVICE_EMU_CONFIG_ADDRESSES, DEVICE_EMU_CONFIG_ENDPOINTS, DEVICE_EMU_CONNECT_RULES, + DEVICE_EMU_DECONFIG_ADDRESSES, DEVICE_EMU_DECONFIG_ENDPOINTS, DEVICE_EMU_ENDPOINTS_COOKED, DEVICE_EMU_ID, + DEVICE_EMU_RECONFIG_ADDRESSES, DEVICE_EMU_UUID) + +logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING) +logging.getLogger('apscheduler.scheduler').setLevel(logging.WARNING) +logging.getLogger('monitoring-client').setLevel(logging.WARNING) + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + +# ----- Test Device Driver Emulated -------------------------------------------- +# Device Driver Emulated tests are used to validate Driver API as well as Emulated Device Driver. Note that other +# Drivers might support a different set of resource paths, and attributes/values per resource; however, they must +# implement the Driver API. + +def test_device_emulated_add_error_cases( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + device_service : DeviceService): # pylint: disable=redefined-outer-name + + with pytest.raises(grpc.RpcError) as e: + DEVICE_EMU_WITH_ENDPOINTS = copy.deepcopy(DEVICE_EMU) + DEVICE_EMU_WITH_ENDPOINTS['device_endpoints'].append(json_endpoint(DEVICE_EMU_ID, 'ep-id', 'ep-type')) + device_client.AddDevice(Device(**DEVICE_EMU_WITH_ENDPOINTS)) + assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT + msg_head = 'device.device_endpoints([' + msg_tail = ']) is invalid; RPC method AddDevice does not accept Endpoints. '\ + 'Endpoints are discovered through interrogation of the physical device.' + except_msg = str(e.value.details()) + assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) + + with pytest.raises(grpc.RpcError) as e: + DEVICE_EMU_WITH_EXTRA_RULES = copy.deepcopy(DEVICE_EMU) + DEVICE_EMU_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONNECT_RULES) + DEVICE_EMU_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONFIG_ENDPOINTS) + device_client.AddDevice(Device(**DEVICE_EMU_WITH_EXTRA_RULES)) + assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT + msg_head = 'device.device_config.config_rules([' + msg_tail = ']) is invalid; RPC method AddDevice only accepts connection Config Rules that should start '\ + 'with "_connect/" tag. Others should be configured after adding the device.' + except_msg = str(e.value.details()) + assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) + + +def test_device_emulated_add_correct( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + device_service : DeviceService): # pylint: disable=redefined-outer-name + + DEVICE_EMU_WITH_CONNECT_RULES = copy.deepcopy(DEVICE_EMU) + DEVICE_EMU_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONNECT_RULES) + device_client.AddDevice(Device(**DEVICE_EMU_WITH_CONNECT_RULES)) + driver_instance_cache = device_service.device_servicer.driver_instance_cache + driver : _Driver = driver_instance_cache.get(DEVICE_EMU_UUID) # we know the driver exists now + assert driver is not None + + +def test_device_emulated_get( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + device_service : DeviceService): # pylint: disable=redefined-outer-name + + initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE_EMU_ID)) + LOGGER.info('initial_config = {:s}'.format(grpc_message_to_json_string(initial_config))) + + device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID)) + LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data))) + + +def test_device_emulated_configure( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + device_service : DeviceService): # pylint: disable=redefined-outer-name + + driver_instance_cache = device_service.device_servicer.driver_instance_cache + driver : _Driver = driver_instance_cache.get(DEVICE_EMU_UUID) # we know the driver exists now + assert driver is not None + + driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + assert len(driver_config) == len(DEVICE_EMU_ENDPOINTS_COOKED) + for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED: + assert endpoint_cooked in driver_config + + DEVICE_EMU_WITH_CONFIG_RULES = copy.deepcopy(DEVICE_EMU) + DEVICE_EMU_WITH_CONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONFIG_ENDPOINTS) + device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_CONFIG_RULES)) + + DEVICE_EMU_WITH_CONFIG_RULES = copy.deepcopy(DEVICE_EMU) + DEVICE_EMU_WITH_CONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONFIG_ADDRESSES) + device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_CONFIG_RULES)) + + DEVICE_EMU_WITH_OPERATIONAL_STATUS = copy.deepcopy(DEVICE_EMU) + DEVICE_EMU_WITH_OPERATIONAL_STATUS['device_operational_status'] = \ + DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED + device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_OPERATIONAL_STATUS)) + + driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + assert len(driver_config) == len(DEVICE_EMU_ENDPOINTS_COOKED) + len(DEVICE_EMU_CONFIG_ADDRESSES) + for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED: + endpoint_cooked = copy.deepcopy(endpoint_cooked) + endpoint_cooked[1]['enabled'] = True + assert endpoint_cooked in driver_config + for config_rule in DEVICE_EMU_CONFIG_ADDRESSES: + assert (config_rule['resource_key'], json.loads(config_rule['resource_value'])) in driver_config + + device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID)) + assert device_data.device_operational_status == DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED + + config_rules = [ + (ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value) + for config_rule in device_data.device_config.config_rules + ] + #LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( + # '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules]))) + RESULTING_CONFIG_ENDPOINTS = {cr['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ENDPOINTS)} + for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED: + values = json.loads(RESULTING_CONFIG_ENDPOINTS[endpoint_cooked[0]]['resource_value']) + values.update(endpoint_cooked[1]) + RESULTING_CONFIG_ENDPOINTS[endpoint_cooked[0]]['resource_value'] = json.dumps(values, sort_keys=True) + for config_rule in RESULTING_CONFIG_ENDPOINTS.values(): + config_rule = ( + ConfigActionEnum.Name(config_rule['action']), config_rule['resource_key'], + json.loads(json.dumps(config_rule['resource_value']))) + assert config_rule in config_rules + for config_rule in DEVICE_EMU_CONFIG_ADDRESSES: + config_rule = ( + ConfigActionEnum.Name(config_rule['action']), config_rule['resource_key'], + json.loads(json.dumps(config_rule['resource_value']))) + assert config_rule in config_rules + + # Try to reconfigure... + + DEVICE_EMU_WITH_RECONFIG_RULES = copy.deepcopy(DEVICE_EMU) + DEVICE_EMU_WITH_RECONFIG_RULES['device_operational_status'] = \ + DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED + DEVICE_EMU_WITH_RECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_RECONFIG_ADDRESSES) + device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_RECONFIG_RULES)) + + RESULTING_CONFIG_RULES = {cr['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ENDPOINTS)} + for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED: + values = json.loads(RESULTING_CONFIG_RULES[endpoint_cooked[0]]['resource_value']) + values.update(endpoint_cooked[1]) + RESULTING_CONFIG_RULES[endpoint_cooked[0]]['resource_value'] = json.dumps(values, sort_keys=True) + RESULTING_CONFIG_RULES.update({cr['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ADDRESSES)}) + for reconfig_rule in DEVICE_EMU_RECONFIG_ADDRESSES: + if reconfig_rule['action'] == ConfigActionEnum.CONFIGACTION_DELETE: + RESULTING_CONFIG_RULES.pop(reconfig_rule['resource_key'], None) + else: + RESULTING_CONFIG_RULES[reconfig_rule['resource_key']] = reconfig_rule + RESULTING_CONFIG_RULES = RESULTING_CONFIG_RULES.values() + #LOGGER.info('RESULTING_CONFIG_RULES = {:s}'.format(str(RESULTING_CONFIG_RULES))) + + driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) + driver_config = json.loads(json.dumps(driver_config)) # prevent integer keys to fail matching with string keys + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + assert len(driver_config) == len(RESULTING_CONFIG_RULES) + for config_rule in RESULTING_CONFIG_RULES: + resource = [config_rule['resource_key'], json.loads(config_rule['resource_value'])] + assert resource in driver_config + + device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID)) + config_rules = [ + (ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value) + for config_rule in device_data.device_config.config_rules + ] + #LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( + # '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules]))) + for config_rule in RESULTING_CONFIG_RULES: + config_rule = ( + ConfigActionEnum.Name(config_rule['action']), config_rule['resource_key'], config_rule['resource_value']) + assert config_rule in config_rules + + +def test_device_emulated_monitor( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + device_service : DeviceService, # pylint: disable=redefined-outer-name + mock_service : MockService_Dependencies): # pylint: disable=redefined-outer-name + + device_uuid = DEVICE_EMU_UUID + json_device_id = DEVICE_EMU_ID + device_id = DeviceId(**json_device_id) + device_data = context_client.GetDevice(device_id) + LOGGER.info('device_data = \n{:s}'.format(str(device_data))) + + driver_instance_cache = device_service.device_servicer.driver_instance_cache + driver : _Driver = driver_instance_cache.get(device_uuid) # we know the driver exists now + assert driver is not None + #driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + #assert len(driver_config) == len(DEVICE_EMU_ENDPOINTS_COOKED) + len(DEVICE_EMU_CONFIG_ADDRESSES) + + SAMPLING_DURATION_SEC = 10.0 + SAMPLING_INTERVAL_SEC = 2.0 + + MONITORING_SETTINGS_LIST = [] + KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED = {} + for endpoint in device_data.device_endpoints: + endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid + for sample_type_id in endpoint.kpi_sample_types: + sample_type_name = str(KpiSampleType.Name(sample_type_id)).upper().replace('KPISAMPLETYPE_', '') + kpi_uuid = '{:s}-{:s}-{:s}-kpi_uuid'.format(device_uuid, endpoint_uuid, str(sample_type_id)) + monitoring_settings = { + 'kpi_id' : {'kpi_id': {'uuid': kpi_uuid}}, + 'kpi_descriptor': { + 'kpi_description': 'Metric {:s} for Endpoint {:s} in Device {:s}'.format( + sample_type_name, endpoint_uuid, device_uuid), + 'kpi_sample_type': sample_type_id, + 'device_id': json_device_id, + 'endpoint_id': json_endpoint_id(json_device_id, endpoint_uuid), + }, + 'sampling_duration_s': SAMPLING_DURATION_SEC, + 'sampling_interval_s': SAMPLING_INTERVAL_SEC, + } + MONITORING_SETTINGS_LIST.append(monitoring_settings) + KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED[kpi_uuid] = 0 + + NUM_SAMPLES_EXPECTED_PER_KPI = SAMPLING_DURATION_SEC / SAMPLING_INTERVAL_SEC + NUM_SAMPLES_EXPECTED = len(MONITORING_SETTINGS_LIST) * NUM_SAMPLES_EXPECTED_PER_KPI + + # Start monitoring the device + t_start_monitoring = datetime.timestamp(datetime.utcnow()) + for monitoring_settings in MONITORING_SETTINGS_LIST: + device_client.MonitorDeviceKpi(MonitoringSettings(**monitoring_settings)) + + # wait to receive the expected number of samples + # if takes more than 1.5 times the sampling duration, assume there is an error + time_ini = time.time() + queue_samples : queue.Queue = mock_service.queue_samples + received_samples = [] + while (len(received_samples) < NUM_SAMPLES_EXPECTED) and (time.time() - time_ini < SAMPLING_DURATION_SEC * 1.5): + try: + received_sample = queue_samples.get(block=True, timeout=SAMPLING_INTERVAL_SEC / NUM_SAMPLES_EXPECTED) + #LOGGER.info('received_sample = {:s}'.format(str(received_sample))) + received_samples.append(received_sample) + except queue.Empty: + continue + + t_end_monitoring = datetime.timestamp(datetime.utcnow()) + + #LOGGER.info('received_samples = {:s}'.format(str(received_samples))) + LOGGER.info('len(received_samples) = {:s}'.format(str(len(received_samples)))) + LOGGER.info('NUM_SAMPLES_EXPECTED = {:s}'.format(str(NUM_SAMPLES_EXPECTED))) + assert len(received_samples) == NUM_SAMPLES_EXPECTED + for received_sample in received_samples: + kpi_uuid = received_sample.kpi_id.kpi_id.uuid + assert kpi_uuid in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED + assert isinstance(received_sample.timestamp, str) + try: + timestamp = float(received_sample.timestamp) + except ValueError: + dt_time = dateutil.parser.isoparse(received_sample.timestamp).replace(tzinfo=timezone.utc) + timestamp = float(calendar.timegm(dt_time.timetuple())) + (dt_time.microsecond / 1.e6) + assert timestamp > t_start_monitoring + assert timestamp < t_end_monitoring + assert received_sample.kpi_value.HasField('floatVal') or received_sample.kpi_value.HasField('intVal') + kpi_value = getattr(received_sample.kpi_value, received_sample.kpi_value.WhichOneof('value')) + assert isinstance(kpi_value, (float, int)) + KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED[kpi_uuid] += 1 + + LOGGER.info('KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED = {:s}'.format(str(KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED))) + for kpi_uuid, num_samples_received in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED.items(): + assert num_samples_received == NUM_SAMPLES_EXPECTED_PER_KPI + + # Unsubscribe monitoring + for kpi_uuid in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED.keys(): + MONITORING_SETTINGS_UNSUBSCRIBE = { + 'kpi_id' : {'kpi_id': {'uuid': kpi_uuid}}, + 'sampling_duration_s': -1, # negative value in sampling_duration_s or sampling_interval_s means unsibscribe + 'sampling_interval_s': -1, # kpi_id is mandatory to unsibscribe + } + device_client.MonitorDeviceKpi(MonitoringSettings(**MONITORING_SETTINGS_UNSUBSCRIBE)) + + +def test_device_emulated_deconfigure( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + device_service : DeviceService): # pylint: disable=redefined-outer-name + + driver_instance_cache = device_service.device_servicer.driver_instance_cache + driver : _Driver = driver_instance_cache.get(DEVICE_EMU_UUID) # we know the driver exists now + assert driver is not None + + driver_config = driver.GetConfig() + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + + DEVICE_EMU_WITH_DECONFIG_RULES = copy.deepcopy(DEVICE_EMU) + DEVICE_EMU_WITH_DECONFIG_RULES['device_operational_status'] = \ + DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED + DEVICE_EMU_WITH_DECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_DECONFIG_ADDRESSES) + device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_DECONFIG_RULES)) + + RESULTING_CONFIG_RULES = {cr['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ENDPOINTS)} + for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED: + values = json.loads(RESULTING_CONFIG_RULES[endpoint_cooked[0]]['resource_value']) + values.update(endpoint_cooked[1]) + RESULTING_CONFIG_RULES[endpoint_cooked[0]]['resource_value'] = json.dumps(values, sort_keys=True) + RESULTING_CONFIG_RULES = RESULTING_CONFIG_RULES.values() + driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) + driver_config = json.loads(json.dumps(driver_config)) # prevent integer keys to fail matching with string keys + driver_config = list(filter( + lambda config_rule: ( + not isinstance(config_rule[1], str) or not config_rule[1].startswith('do_sampling (trigger:')), + driver_config)) + LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + LOGGER.info('RESULTING_CONFIG_RULES = {:s}'.format(str(RESULTING_CONFIG_RULES))) + assert len(driver_config) == len(RESULTING_CONFIG_RULES) + for config_rule in RESULTING_CONFIG_RULES: + config_rule = [config_rule['resource_key'], json.loads(config_rule['resource_value'])] + #LOGGER.info('config_rule = {:s}'.format(str(config_rule))) + assert config_rule in driver_config + + DEVICE_EMU_WITH_DECONFIG_RULES = copy.deepcopy(DEVICE_EMU) + DEVICE_EMU_WITH_DECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_DECONFIG_ENDPOINTS) + device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_DECONFIG_RULES)) + + driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) + driver_config = json.loads(json.dumps(driver_config)) # prevent integer keys to fail matching with string keys + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + assert len(driver_config) == 0 + + device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID)) + config_rules = device_data.device_config.config_rules + LOGGER.info('config_rules = {:s}'.format(str(config_rules))) + clean_config_rules = [] + for config_rule in config_rules: + if config_rule.resource_key.startswith('/endpoints/endpoint'): continue + config_rule_value = json.loads(config_rule.resource_value) + if isinstance(config_rule_value, str) and config_rule_value.startswith('do_sampling (trigger:'): continue + clean_config_rules.append(config_rule) + LOGGER.info('clean_config_rules = {:s}'.format(str(clean_config_rules))) + assert len(clean_config_rules) == 0 + + +def test_device_emulated_delete( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + device_service : DeviceService): # pylint: disable=redefined-outer-name + + device_client.DeleteDevice(DeviceId(**DEVICE_EMU_ID)) + driver_instance_cache = device_service.device_servicer.driver_instance_cache + driver : _Driver = driver_instance_cache.get(DEVICE_EMU_UUID, {}) + assert driver is None diff --git a/src/device/tests/test_unitary_microwave.py b/src/device/tests/test_unitary_microwave.py index 8718d99fb9d9d430e49ecd44edc60e5a2e9be339..c5cd70b993971812cd89dd970e9835cfd04a548f 100644 --- a/src/device/tests/test_unitary_microwave.py +++ b/src/device/tests/test_unitary_microwave.py @@ -12,211 +12,61 @@ # See the License for the specific language governing permissions and # limitations under the License. -import calendar, copy, dateutil.parser, grpc, json, logging, operator, os, pytest, queue, time -from datetime import datetime, timezone -from typing import Tuple -from common.orm.Database import Database -from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum -from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum -from common.message_broker.MessageBroker import MessageBroker +import copy, grpc, logging, pytest from common.tools.grpc.Tools import grpc_message_to_json_string -from common.tools.object_factory.EndPoint import json_endpoint, json_endpoint_id -from context.Config import ( - GRPC_SERVICE_PORT as CONTEXT_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as CONTEXT_GRPC_MAX_WORKERS, - GRPC_GRACE_PERIOD as CONTEXT_GRPC_GRACE_PERIOD) from context.client.ContextClient import ContextClient -from context.proto.context_pb2 import DeviceId, DeviceOperationalStatusEnum -from context.service.grpc_server.ContextService import ContextService -from device.Config import ( - GRPC_SERVICE_PORT as DEVICE_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as DEVICE_GRPC_MAX_WORKERS, - GRPC_GRACE_PERIOD as DEVICE_GRPC_GRACE_PERIOD) +from context.proto.context_pb2 import DeviceId from device.client.DeviceClient import DeviceClient -from device.proto.context_pb2 import ConfigActionEnum, Context, Device, Topology -from device.proto.device_pb2 import MonitoringSettings -from device.proto.kpi_sample_types_pb2 import KpiSampleType +from device.proto.context_pb2 import ConfigActionEnum, Device from device.service.DeviceService import DeviceService from device.service.driver_api._Driver import _Driver -from device.service.driver_api.DriverFactory import DriverFactory -from device.service.driver_api.DriverInstanceCache import DriverInstanceCache -from device.service.drivers import DRIVERS -from device.tests.MockMonitoringService import MockMonitoringService -from monitoring.Config import ( - GRPC_SERVICE_PORT as MONITORING_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as MONITORING_GRPC_MAX_WORKERS, - GRPC_GRACE_PERIOD as MONITORING_GRPC_GRACE_PERIOD) -from monitoring.client.monitoring_client import MonitoringClient -from .CommonObjects import CONTEXT, TOPOLOGY - -from .Device_Emulated import ( - DEVICE_EMU, DEVICE_EMU_CONFIG_ADDRESSES, DEVICE_EMU_CONFIG_ENDPOINTS, DEVICE_EMU_CONNECT_RULES, - DEVICE_EMU_DECONFIG_ADDRESSES, DEVICE_EMU_DECONFIG_ENDPOINTS, DEVICE_EMU_EP_DESCS, DEVICE_EMU_ENDPOINTS_COOKED, - DEVICE_EMU_ID, DEVICE_EMU_RECONFIG_ADDRESSES, DEVICE_EMU_UUID) -ENABLE_EMULATED = True - -try: - from .Device_OpenConfig_Infinera1 import( - #from .Device_OpenConfig_Infinera2 import( - DEVICE_OC, DEVICE_OC_CONFIG_RULES, DEVICE_OC_DECONFIG_RULES, DEVICE_OC_CONNECT_RULES, DEVICE_OC_ID, - DEVICE_OC_UUID) - ENABLE_OPENCONFIG = True -except ImportError: - ENABLE_OPENCONFIG = False - -try: - from .Device_Transport_Api_CTTC import ( - DEVICE_TAPI, DEVICE_TAPI_CONNECT_RULES, DEVICE_TAPI_UUID, DEVICE_TAPI_ID, DEVICE_TAPI_CONFIG_RULES, - DEVICE_TAPI_DECONFIG_RULES) - ENABLE_TAPI = True -except ImportError: - ENABLE_TAPI = False +from .PrepareTestScenario import ( # pylint: disable=unused-import + # be careful, order of symbols is important here! + mock_service, device_service, context_client, device_client, monitoring_client, test_prepare_environment) try: from .Device_Microwave_Template import ( - DEVICE_MICROWAVE, DEVICE_MICROWAVE_CONNECT_RULES, DEVICE_MICROWAVE_UUID, DEVICE_MICROWAVE_ID, DEVICE_MICROWAVE_CONFIG_RULES, - DEVICE_MICROWAVE_DECONFIG_RULES) + DEVICE_MICROWAVE, DEVICE_MICROWAVE_CONNECT_RULES, DEVICE_MICROWAVE_UUID, DEVICE_MICROWAVE_ID, + DEVICE_MICROWAVE_CONFIG_RULES, DEVICE_MICROWAVE_DECONFIG_RULES) ENABLE_MICROWAVE = True -except ImportError as error: - ENABLE_MICROWAVE = False - print(error.__class__.__name__ + ": " + error.message) - -from .mock_p4runtime_service import MockP4RuntimeService -try: - from .device_p4 import( - DEVICE_P4, DEVICE_P4_ID, DEVICE_P4_UUID, DEVICE_P4_NAME, DEVICE_P4_ADDRESS, DEVICE_P4_PORT, DEVICE_P4_WORKERS, - DEVICE_P4_GRACE_PERIOD, DEVICE_P4_CONNECT_RULES, DEVICE_P4_CONFIG_RULES) - ENABLE_P4 = True except ImportError: - ENABLE_P4 = False - -#ENABLE_EMULATED = False # set to False to disable tests of Emulated devices -#ENABLE_OPENCONFIG = False # set to False to disable tests of OpenConfig devices -#ENABLE_TAPI = False # set to False to disable tests of TAPI devices -#ENABLE_P4 = False # set to False to disable tests of P4 devices - -ENABLE_OPENCONFIG_CONFIGURE = True -ENABLE_OPENCONFIG_MONITOR = True -ENABLE_OPENCONFIG_DECONFIGURE = True - - -logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING) -logging.getLogger('apscheduler.scheduler').setLevel(logging.WARNING) -logging.getLogger('monitoring-client').setLevel(logging.WARNING) + ENABLE_MICROWAVE = False LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) -CONTEXT_GRPC_SERVICE_PORT = 10000 + CONTEXT_GRPC_SERVICE_PORT # avoid privileged ports -DEVICE_GRPC_SERVICE_PORT = 10000 + DEVICE_GRPC_SERVICE_PORT # avoid privileged ports -MONITORING_GRPC_SERVICE_PORT = 10000 + MONITORING_GRPC_SERVICE_PORT # avoid privileged ports - -DEFAULT_REDIS_SERVICE_HOST = '127.0.0.1' -DEFAULT_REDIS_SERVICE_PORT = 6379 -DEFAULT_REDIS_DATABASE_ID = 0 - -REDIS_CONFIG = { - 'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST), - 'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT), - 'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID', DEFAULT_REDIS_DATABASE_ID ), -} - -SCENARIOS = [ - ('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ), - #('all_redis', DatabaseBackendEnum.REDIS, REDIS_CONFIG, MessageBrokerBackendEnum.REDIS, REDIS_CONFIG), -] - -@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS) -def context_db_mb(request) -> Tuple[Database, MessageBroker]: - name,db_backend,db_settings,mb_backend,mb_settings = request.param - msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...' - LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings))) - _database = Database(get_database_backend(backend=db_backend, **db_settings)) - _message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings)) - yield _database, _message_broker - _message_broker.terminate() - -@pytest.fixture(scope='session') -def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name - _service = ContextService( - context_db_mb[0], context_db_mb[1], port=CONTEXT_GRPC_SERVICE_PORT, max_workers=CONTEXT_GRPC_MAX_WORKERS, - grace_period=CONTEXT_GRPC_GRACE_PERIOD) - _service.start() - yield _service - _service.stop() - -@pytest.fixture(scope='session') -def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name - _client = ContextClient(address='127.0.0.1', port=CONTEXT_GRPC_SERVICE_PORT) - yield _client - _client.close() - -@pytest.fixture(scope='session') -def monitoring_service(): - _service = MockMonitoringService(port=MONITORING_GRPC_SERVICE_PORT, max_workers=MONITORING_GRPC_MAX_WORKERS, - grace_period=MONITORING_GRPC_GRACE_PERIOD) - _service.start() - yield _service - _service.stop() - -@pytest.fixture(scope='session') -def monitoring_client(monitoring_service : MockMonitoringService): # pylint: disable=redefined-outer-name - _client = MonitoringClient(server='127.0.0.1', port=MONITORING_GRPC_SERVICE_PORT) - #yield _client - #_client.close() - return _client - -@pytest.fixture(scope='session') -def device_service( - context_client : ContextClient, # pylint: disable=redefined-outer-name - monitoring_client : MonitoringClient): # pylint: disable=redefined-outer-name - - _driver_factory = DriverFactory(DRIVERS) - _driver_instance_cache = DriverInstanceCache(_driver_factory) - _service = DeviceService( - context_client, monitoring_client, _driver_instance_cache, port=DEVICE_GRPC_SERVICE_PORT, - max_workers=DEVICE_GRPC_MAX_WORKERS, grace_period=DEVICE_GRPC_GRACE_PERIOD) - _service.start() - yield _service - _service.stop() - -@pytest.fixture(scope='session') -def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name - _client = DeviceClient(address='127.0.0.1', port=DEVICE_GRPC_SERVICE_PORT) - yield _client - _client.close() - -@pytest.fixture(scope='session') -def p4runtime_service(): - _service = MockP4RuntimeService( - address=DEVICE_P4_ADDRESS, port=DEVICE_P4_PORT, - max_workers=DEVICE_P4_WORKERS, - grace_period=DEVICE_P4_GRACE_PERIOD) - _service.start() - yield _service - _service.stop() - - -def test_prepare_environment( - context_client : ContextClient, # pylint: disable=redefined-outer-name - device_client : DeviceClient, # pylint: disable=redefined-outer-name - device_service : DeviceService): # pylint: disable=redefined-outer-name - context_client.SetContext(Context(**CONTEXT)) - context_client.SetTopology(Topology(**TOPOLOGY)) +# ----- Test Device Driver Microwave ------------------------------------------------ +def test_device_microwave_add_error_cases( + device_client : DeviceClient): # pylint: disable=redefined-outer-name + if not ENABLE_MICROWAVE: pytest.skip('Skipping test: No TAPI device has been configured') + with pytest.raises(grpc.RpcError) as e: + DEVICE_MICROWAVE_WITH_EXTRA_RULES = copy.deepcopy(DEVICE_MICROWAVE) + DEVICE_MICROWAVE_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_MICROWAVE_CONNECT_RULES) + DEVICE_MICROWAVE_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_MICROWAVE_CONFIG_RULES) + device_client.AddDevice(Device(**DEVICE_MICROWAVE_WITH_EXTRA_RULES)) + assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT + msg_head = 'device.device_config.config_rules([' + msg_tail = ']) is invalid; RPC method AddDevice only accepts connection Config Rules that should start '\ + 'with "_connect/" tag. Others should be configured after adding the device.' + except_msg = str(e.value.details()) + assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) -# ----- Test Device Driver Microwave ------------------------------------------------ def test_device_microwave_add_correct( device_client: DeviceClient, # pylint: disable=redefined-outer-name device_service: DeviceService): # pylint: disable=redefined-outer-name if not ENABLE_MICROWAVE: pytest.skip('Skipping test: No MICROWAVE device has been configured') - + DEVICE_MICROWAVE_WITH_CONNECT_RULES = copy.deepcopy(DEVICE_MICROWAVE) DEVICE_MICROWAVE_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_MICROWAVE_CONNECT_RULES) device_client.AddDevice(Device(**DEVICE_MICROWAVE_WITH_CONNECT_RULES)) - driver: _Driver = device_service.driver_instance_cache.get(DEVICE_MICROWAVE_UUID) + driver_instance_cache = device_service.device_servicer.driver_instance_cache + driver: _Driver = driver_instance_cache.get(DEVICE_MICROWAVE_UUID) assert driver is not None @@ -240,7 +90,8 @@ def test_device_microwave_configure( if not ENABLE_MICROWAVE: pytest.skip('Skipping test: No MICROWAVE device has been configured') - driver : _Driver = device_service.driver_instance_cache.get(DEVICE_MICROWAVE_UUID) + driver_instance_cache = device_service.device_servicer.driver_instance_cache + driver : _Driver = driver_instance_cache.get(DEVICE_MICROWAVE_UUID) assert driver is not None # Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly. @@ -263,8 +114,6 @@ def test_device_microwave_configure( LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules]))) for config_rule in DEVICE_MICROWAVE_CONFIG_RULES: - #import pdb; - #pdb. set_trace() config_rule = ( ConfigActionEnum.Name(config_rule['action']), config_rule['resource_key'], config_rule['resource_value']) assert config_rule in config_rules @@ -277,7 +126,8 @@ def test_device_microwave_deconfigure( if not ENABLE_MICROWAVE: pytest.skip('Skipping test: No MICROWAVE device has been configured') - driver: _Driver = device_service.driver_instance_cache.get(DEVICE_MICROWAVE_UUID) + driver_instance_cache = device_service.device_servicer.driver_instance_cache + driver: _Driver = driver_instance_cache.get(DEVICE_MICROWAVE_UUID) assert driver is not None # Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly. @@ -312,5 +162,6 @@ def test_device_microwave_delete( if not ENABLE_MICROWAVE: pytest.skip('Skipping test: No MICROWAVE device has been configured') device_client.DeleteDevice(DeviceId(**DEVICE_MICROWAVE_ID)) - driver : _Driver = device_service.driver_instance_cache.get(DEVICE_MICROWAVE_UUID, {}) + driver_instance_cache = device_service.device_servicer.driver_instance_cache + driver : _Driver = driver_instance_cache.get(DEVICE_MICROWAVE_UUID, {}) assert driver is None \ No newline at end of file diff --git a/src/device/tests/test_unitary_openconfig.py b/src/device/tests/test_unitary_openconfig.py new file mode 100644 index 0000000000000000000000000000000000000000..968272c04a1304b313d8a988ce0a432e3749a9b8 --- /dev/null +++ b/src/device/tests/test_unitary_openconfig.py @@ -0,0 +1,299 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import calendar, copy, dateutil.parser, grpc, logging, pytest, queue, time +from datetime import datetime, timezone +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.tools.object_factory.EndPoint import json_endpoint_id +from context.client.ContextClient import ContextClient +from context.proto.context_pb2 import DeviceId +from device.client.DeviceClient import DeviceClient +from device.proto.context_pb2 import ConfigActionEnum, Device +from device.proto.device_pb2 import MonitoringSettings +from device.proto.kpi_sample_types_pb2 import KpiSampleType +from device.service.DeviceService import DeviceService +from device.service.driver_api._Driver import _Driver +from .MockService_Dependencies import MockService_Dependencies +from .PrepareTestScenario import ( # pylint: disable=unused-import + # be careful, order of symbols is important here! + mock_service, device_service, context_client, device_client, monitoring_client, test_prepare_environment) + +try: + from .Device_OpenConfig_Infinera1 import( + #from .Device_OpenConfig_Infinera2 import( + DEVICE_OC, DEVICE_OC_CONFIG_RULES, DEVICE_OC_DECONFIG_RULES, DEVICE_OC_CONNECT_RULES, DEVICE_OC_ID, + DEVICE_OC_UUID) + ENABLE_OPENCONFIG = True +except ImportError: + ENABLE_OPENCONFIG = False + +ENABLE_OPENCONFIG_CONFIGURE = True +ENABLE_OPENCONFIG_MONITOR = True +ENABLE_OPENCONFIG_DECONFIGURE = True + + +logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING) +logging.getLogger('apscheduler.scheduler').setLevel(logging.WARNING) +logging.getLogger('monitoring-client').setLevel(logging.WARNING) + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + + +# ----- Test Device Driver OpenConfig ------------------------------------------ + +def test_device_openconfig_add_error_cases( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + device_service : DeviceService): # pylint: disable=redefined-outer-name + + if not ENABLE_OPENCONFIG: pytest.skip('Skipping test: No OpenConfig device has been configured') + + with pytest.raises(grpc.RpcError) as e: + DEVICE_OC_WITH_EXTRA_RULES = copy.deepcopy(DEVICE_OC) + DEVICE_OC_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_OC_CONNECT_RULES) + DEVICE_OC_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_OC_CONFIG_RULES) + device_client.AddDevice(Device(**DEVICE_OC_WITH_EXTRA_RULES)) + assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT + msg_head = 'device.device_config.config_rules([' + msg_tail = ']) is invalid; RPC method AddDevice only accepts connection Config Rules that should start '\ + 'with "_connect/" tag. Others should be configured after adding the device.' + except_msg = str(e.value.details()) + assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) + + +def test_device_openconfig_add_correct( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + device_service : DeviceService): # pylint: disable=redefined-outer-name + + if not ENABLE_OPENCONFIG: pytest.skip('Skipping test: No OpenConfig device has been configured') + + DEVICE_OC_WITH_CONNECT_RULES = copy.deepcopy(DEVICE_OC) + DEVICE_OC_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_OC_CONNECT_RULES) + device_client.AddDevice(Device(**DEVICE_OC_WITH_CONNECT_RULES)) + driver_instance_cache = device_service.device_servicer.driver_instance_cache + driver : _Driver = driver_instance_cache.get(DEVICE_OC_UUID) # we know the driver exists now + assert driver is not None + + device_data = context_client.GetDevice(DeviceId(**DEVICE_OC_ID)) + config_rules = [ + (ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value) + for config_rule in device_data.device_config.config_rules + ] + LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( + '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules]))) + + +def test_device_openconfig_get( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + device_service : DeviceService): # pylint: disable=redefined-outer-name + + if not ENABLE_OPENCONFIG: pytest.skip('Skipping test: No OpenConfig device has been configured') + + initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE_OC_ID)) + LOGGER.info('initial_config = {:s}'.format(grpc_message_to_json_string(initial_config))) + + device_data = context_client.GetDevice(DeviceId(**DEVICE_OC_ID)) + LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data))) + + +def test_device_openconfig_configure( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + device_service : DeviceService): # pylint: disable=redefined-outer-name + + if not ENABLE_OPENCONFIG: pytest.skip('Skipping test: No OpenConfig device has been configured') + if not ENABLE_OPENCONFIG_CONFIGURE: pytest.skip('Skipping test OpenConfig configure') + + driver_instance_cache = device_service.device_servicer.driver_instance_cache + driver : _Driver = driver_instance_cache.get(DEVICE_OC_UUID) # we know the driver exists now + assert driver is not None + + # Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly. + #driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + + DEVICE_OC_WITH_CONFIG_RULES = copy.deepcopy(DEVICE_OC) + DEVICE_OC_WITH_CONFIG_RULES['device_config']['config_rules'].extend(DEVICE_OC_CONFIG_RULES) + device_client.ConfigureDevice(Device(**DEVICE_OC_WITH_CONFIG_RULES)) + + # Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly. + #driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + + device_data = context_client.GetDevice(DeviceId(**DEVICE_OC_ID)) + config_rules = [ + (ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value) + for config_rule in device_data.device_config.config_rules + ] + LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( + '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules]))) + for config_rule in DEVICE_OC_CONFIG_RULES: + config_rule = ( + ConfigActionEnum.Name(config_rule['action']), config_rule['resource_key'], config_rule['resource_value']) + assert config_rule in config_rules + + +def test_device_openconfig_monitor( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + device_service : DeviceService, # pylint: disable=redefined-outer-name + mock_service : MockService_Dependencies): # pylint: disable=redefined-outer-name + + if not ENABLE_OPENCONFIG: pytest.skip('Skipping test: No OpenConfig device has been configured') + if not ENABLE_OPENCONFIG_MONITOR: pytest.skip('Skipping test OpenConfig monitor') + + device_uuid = DEVICE_OC_UUID + json_device_id = DEVICE_OC_ID + device_id = DeviceId(**json_device_id) + device_data = context_client.GetDevice(device_id) + #LOGGER.info('device_data = \n{:s}'.format(str(device_data))) + + driver_instance_cache = device_service.device_servicer.driver_instance_cache + driver : _Driver = driver_instance_cache.get(device_uuid) # we know the driver exists now + assert driver is not None + + SAMPLING_DURATION_SEC = 60.0 + SAMPLING_INTERVAL_SEC = 15.0 + + MONITORING_SETTINGS_LIST = [] + KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED = {} + for endpoint in device_data.device_endpoints: + endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid + for sample_type_id in endpoint.kpi_sample_types: + sample_type_name = str(KpiSampleType.Name(sample_type_id)).upper().replace('KPISAMPLETYPE_', '') + kpi_uuid = '{:s}-{:s}-{:s}-kpi_uuid'.format(device_uuid, endpoint_uuid, str(sample_type_id)) + monitoring_settings = { + 'kpi_id' : {'kpi_id': {'uuid': kpi_uuid}}, + 'kpi_descriptor': { + 'kpi_description': 'Metric {:s} for Endpoint {:s} in Device {:s}'.format( + sample_type_name, endpoint_uuid, device_uuid), + 'kpi_sample_type': sample_type_id, + 'device_id': json_device_id, + 'endpoint_id': json_endpoint_id(json_device_id, endpoint_uuid), + }, + 'sampling_duration_s': SAMPLING_DURATION_SEC, + 'sampling_interval_s': SAMPLING_INTERVAL_SEC, + } + MONITORING_SETTINGS_LIST.append(monitoring_settings) + KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED[kpi_uuid] = 0 + + NUM_SAMPLES_EXPECTED_PER_KPI = SAMPLING_DURATION_SEC / SAMPLING_INTERVAL_SEC + NUM_SAMPLES_EXPECTED = len(MONITORING_SETTINGS_LIST) * NUM_SAMPLES_EXPECTED_PER_KPI + + # Start monitoring the device + t_start_monitoring = datetime.timestamp(datetime.utcnow()) + for monitoring_settings in MONITORING_SETTINGS_LIST: + device_client.MonitorDeviceKpi(MonitoringSettings(**monitoring_settings)) + + # wait to receive the expected number of samples + # if takes more than 1.5 times the sampling duration, assume there is an error + time_ini = time.time() + queue_samples : queue.Queue = mock_service.queue_samples + received_samples = [] + while (len(received_samples) < NUM_SAMPLES_EXPECTED) and (time.time() - time_ini < SAMPLING_DURATION_SEC * 1.5): + try: + received_sample = queue_samples.get(block=True, timeout=SAMPLING_INTERVAL_SEC / NUM_SAMPLES_EXPECTED) + #LOGGER.info('received_sample = {:s}'.format(str(received_sample))) + received_samples.append(received_sample) + except queue.Empty: + continue + + t_end_monitoring = datetime.timestamp(datetime.utcnow()) + + #LOGGER.info('received_samples = {:s}'.format(str(received_samples))) + LOGGER.info('len(received_samples) = {:s}'.format(str(len(received_samples)))) + LOGGER.info('NUM_SAMPLES_EXPECTED = {:s}'.format(str(NUM_SAMPLES_EXPECTED))) + #assert len(received_samples) == NUM_SAMPLES_EXPECTED + for received_sample in received_samples: + kpi_uuid = received_sample.kpi_id.kpi_id.uuid + assert kpi_uuid in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED + assert isinstance(received_sample.timestamp, str) + try: + timestamp = float(received_sample.timestamp) + except ValueError: + dt_time = dateutil.parser.isoparse(received_sample.timestamp).replace(tzinfo=timezone.utc) + timestamp = float(calendar.timegm(dt_time.timetuple())) + (dt_time.microsecond / 1.e6) + assert timestamp > t_start_monitoring + assert timestamp < t_end_monitoring + assert received_sample.kpi_value.HasField('floatVal') or received_sample.kpi_value.HasField('intVal') + kpi_value = getattr(received_sample.kpi_value, received_sample.kpi_value.WhichOneof('value')) + assert isinstance(kpi_value, (float, int)) + KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED[kpi_uuid] += 1 + + LOGGER.info('KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED = {:s}'.format(str(KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED))) + # TODO: review why num_samples_received per KPI != NUM_SAMPLES_EXPECTED_PER_KPI + #for kpi_uuid, num_samples_received in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED.items(): + # assert num_samples_received == NUM_SAMPLES_EXPECTED_PER_KPI + + # Unsubscribe monitoring + for kpi_uuid in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED: + MONITORING_SETTINGS_UNSUBSCRIBE = { + 'kpi_id' : {'kpi_id': {'uuid': kpi_uuid}}, + 'sampling_duration_s': -1, # negative value in sampling_duration_s or sampling_interval_s means unsibscribe + 'sampling_interval_s': -1, # kpi_id is mandatory to unsibscribe + } + device_client.MonitorDeviceKpi(MonitoringSettings(**MONITORING_SETTINGS_UNSUBSCRIBE)) + + +def test_device_openconfig_deconfigure( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + device_service : DeviceService): # pylint: disable=redefined-outer-name + + if not ENABLE_OPENCONFIG: pytest.skip('Skipping test: No OpenConfig device has been configured') + if not ENABLE_OPENCONFIG_DECONFIGURE: pytest.skip('Skipping test OpenConfig deconfigure') + + driver_instance_cache = device_service.device_servicer.driver_instance_cache + driver : _Driver = driver_instance_cache.get(DEVICE_OC_UUID) # we know the driver exists now + assert driver is not None + + # Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly. + #driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + + DEVICE_OC_WITH_DECONFIG_RULES = copy.deepcopy(DEVICE_OC) + DEVICE_OC_WITH_DECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_OC_DECONFIG_RULES) + device_client.ConfigureDevice(Device(**DEVICE_OC_WITH_DECONFIG_RULES)) + + # Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly. + #driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + + device_data = context_client.GetDevice(DeviceId(**DEVICE_OC_ID)) + config_rules = [ + (ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value) + for config_rule in device_data.device_config.config_rules + ] + LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( + '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules]))) + for config_rule in DEVICE_OC_DECONFIG_RULES: + action_set = ConfigActionEnum.Name(ConfigActionEnum.CONFIGACTION_SET) + config_rule = (action_set, config_rule['resource_key'], config_rule['resource_value']) + assert config_rule not in config_rules + + +def test_device_openconfig_delete( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + device_service : DeviceService): # pylint: disable=redefined-outer-name + + if not ENABLE_OPENCONFIG: pytest.skip('Skipping test: No OpenConfig device has been configured') + + device_client.DeleteDevice(DeviceId(**DEVICE_OC_ID)) + driver_instance_cache = device_service.device_servicer.driver_instance_cache + driver : _Driver = driver_instance_cache.get(DEVICE_OC_UUID, {}) + assert driver is None diff --git a/src/device/tests/test_unitary_p4.py b/src/device/tests/test_unitary_p4.py new file mode 100644 index 0000000000000000000000000000000000000000..d8a5d37b8f01685f10ab2d2ec967d09312fc4cc4 --- /dev/null +++ b/src/device/tests/test_unitary_p4.py @@ -0,0 +1,146 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy, grpc, logging, pytest +from common.tools.grpc.Tools import grpc_message_to_json_string +from context.client.ContextClient import ContextClient +from context.proto.context_pb2 import DeviceId +from device.client.DeviceClient import DeviceClient +from device.proto.context_pb2 import Device +from device.service.DeviceService import DeviceService +from device.service.driver_api._Driver import _Driver +from .PrepareTestScenario import ( # pylint: disable=unused-import + # be careful, order of symbols is important here! + mock_service, device_service, context_client, device_client, monitoring_client, test_prepare_environment) + +from .mock_p4runtime_service import MockP4RuntimeService +try: + from .device_p4 import( + DEVICE_P4, DEVICE_P4_ID, DEVICE_P4_UUID, DEVICE_P4_ADDRESS, DEVICE_P4_PORT, DEVICE_P4_WORKERS, + DEVICE_P4_GRACE_PERIOD, DEVICE_P4_CONNECT_RULES, DEVICE_P4_CONFIG_RULES) + ENABLE_P4 = True +except ImportError: + ENABLE_P4 = False + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + +@pytest.fixture(scope='session') +def p4runtime_service(): + _service = MockP4RuntimeService( + address=DEVICE_P4_ADDRESS, port=DEVICE_P4_PORT, + max_workers=DEVICE_P4_WORKERS, + grace_period=DEVICE_P4_GRACE_PERIOD) + _service.start() + yield _service + _service.stop() + + +# ----- Test Device Driver P4 -------------------------------------------------- + +def test_device_p4_add_error_cases( + context_client: ContextClient, # pylint: disable=redefined-outer-name + device_client: DeviceClient, # pylint: disable=redefined-outer-name + device_service: DeviceService): # pylint: disable=redefined-outer-name + + if not ENABLE_P4: pytest.skip( + 'Skipping test: No P4 device has been configured') + + with pytest.raises(grpc.RpcError) as e: + device_p4_with_extra_rules = copy.deepcopy(DEVICE_P4) + device_p4_with_extra_rules['device_config']['config_rules'].extend( + DEVICE_P4_CONNECT_RULES) + device_p4_with_extra_rules['device_config']['config_rules'].extend( + DEVICE_P4_CONFIG_RULES) + device_client.AddDevice(Device(**device_p4_with_extra_rules)) + assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT + msg_head = 'device.device_config.config_rules([' + msg_tail = ']) is invalid; RPC method AddDevice only accepts connection Config Rules that should start '\ + 'with "_connect/" tag. Others should be configured after adding the device.' + except_msg = str(e.value.details()) + assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) + + +def test_device_p4_add_correct( + context_client: ContextClient, # pylint: disable=redefined-outer-name + device_client: DeviceClient, # pylint: disable=redefined-outer-name + device_service: DeviceService, # pylint: disable=redefined-outer-name + p4runtime_service: MockP4RuntimeService): # pylint: disable=redefined-outer-name + + if not ENABLE_P4: pytest.skip( + 'Skipping test: No P4 device has been configured') + + device_p4_with_connect_rules = copy.deepcopy(DEVICE_P4) + device_p4_with_connect_rules['device_config']['config_rules'].extend( + DEVICE_P4_CONNECT_RULES) + device_client.AddDevice(Device(**device_p4_with_connect_rules)) + driver_instance_cache = device_service.device_servicer.driver_instance_cache + driver : _Driver = driver_instance_cache.get(DEVICE_P4_UUID) + assert driver is not None + + +def test_device_p4_get( + context_client: ContextClient, # pylint: disable=redefined-outer-name + device_client: DeviceClient, # pylint: disable=redefined-outer-name + device_service: DeviceService, # pylint: disable=redefined-outer-name + p4runtime_service: MockP4RuntimeService): # pylint: disable=redefined-outer-name + + if not ENABLE_P4: pytest.skip( + 'Skipping test: No P4 device has been configured') + + initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE_P4_ID)) + LOGGER.info('initial_config = {:s}'.format( + grpc_message_to_json_string(initial_config))) + + device_data = context_client.GetDevice(DeviceId(**DEVICE_P4_ID)) + LOGGER.info('device_data = {:s}'.format( + grpc_message_to_json_string(device_data))) + + +def test_device_p4_configure( + context_client: ContextClient, # pylint: disable=redefined-outer-name + device_client: DeviceClient, # pylint: disable=redefined-outer-name + device_service: DeviceService, # pylint: disable=redefined-outer-name + p4runtime_service: MockP4RuntimeService): # pylint: disable=redefined-outer-name + + if not ENABLE_P4: pytest.skip( + 'Skipping test: No P4 device has been configured') + + pytest.skip('Skipping test for unimplemented method') + + +def test_device_p4_deconfigure( + context_client: ContextClient, # pylint: disable=redefined-outer-name + device_client: DeviceClient, # pylint: disable=redefined-outer-name + device_service: DeviceService, # pylint: disable=redefined-outer-name + p4runtime_service: MockP4RuntimeService): # pylint: disable=redefined-outer-name + + if not ENABLE_P4: pytest.skip( + 'Skipping test: No P4 device has been configured') + + pytest.skip('Skipping test for unimplemented method') + + +def test_device_p4_delete( + context_client: ContextClient, # pylint: disable=redefined-outer-name + device_client: DeviceClient, # pylint: disable=redefined-outer-name + device_service: DeviceService, # pylint: disable=redefined-outer-name + p4runtime_service: MockP4RuntimeService): # pylint: disable=redefined-outer-name + + if not ENABLE_P4: pytest.skip('Skipping test: No P4 device has been configured') + + device_client.DeleteDevice(DeviceId(**DEVICE_P4_ID)) + driver_instance_cache = device_service.device_servicer.driver_instance_cache + driver : _Driver = driver_instance_cache.get(DEVICE_P4_UUID) + assert driver is None diff --git a/src/device/tests/test_unitary_tapi.py b/src/device/tests/test_unitary_tapi.py new file mode 100644 index 0000000000000000000000000000000000000000..ce01619ce6b8144ac81ddd73c700310e23c0b52e --- /dev/null +++ b/src/device/tests/test_unitary_tapi.py @@ -0,0 +1,167 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy, grpc, logging, pytest +from common.tools.grpc.Tools import grpc_message_to_json_string +from context.client.ContextClient import ContextClient +from context.proto.context_pb2 import DeviceId +from device.client.DeviceClient import DeviceClient +from device.proto.context_pb2 import ConfigActionEnum, Device +from device.service.DeviceService import DeviceService +from device.service.driver_api._Driver import _Driver +from .PrepareTestScenario import ( # pylint: disable=unused-import + # be careful, order of symbols is important here! + mock_service, device_service, context_client, device_client, monitoring_client, test_prepare_environment) + +try: + from .Device_Transport_Api_CTTC import ( + DEVICE_TAPI, DEVICE_TAPI_CONNECT_RULES, DEVICE_TAPI_UUID, DEVICE_TAPI_ID, DEVICE_TAPI_CONFIG_RULES, + DEVICE_TAPI_DECONFIG_RULES) + ENABLE_TAPI = True +except ImportError: + ENABLE_TAPI = False + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + + +# ----- Test Device Driver TAPI ------------------------------------------------ + +def test_device_tapi_add_error_cases( + device_client : DeviceClient): # pylint: disable=redefined-outer-name + + if not ENABLE_TAPI: pytest.skip('Skipping test: No TAPI device has been configured') + + with pytest.raises(grpc.RpcError) as e: + DEVICE_TAPI_WITH_EXTRA_RULES = copy.deepcopy(DEVICE_TAPI) + DEVICE_TAPI_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_TAPI_CONNECT_RULES) + DEVICE_TAPI_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_TAPI_CONFIG_RULES) + device_client.AddDevice(Device(**DEVICE_TAPI_WITH_EXTRA_RULES)) + assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT + msg_head = 'device.device_config.config_rules([' + msg_tail = ']) is invalid; RPC method AddDevice only accepts connection Config Rules that should start '\ + 'with "_connect/" tag. Others should be configured after adding the device.' + except_msg = str(e.value.details()) + assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) + + +def test_device_tapi_add_correct( + device_client: DeviceClient, # pylint: disable=redefined-outer-name + device_service: DeviceService): # pylint: disable=redefined-outer-name + + if not ENABLE_TAPI: pytest.skip('Skipping test: No TAPI device has been configured') + + DEVICE_TAPI_WITH_CONNECT_RULES = copy.deepcopy(DEVICE_TAPI) + DEVICE_TAPI_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_TAPI_CONNECT_RULES) + device_client.AddDevice(Device(**DEVICE_TAPI_WITH_CONNECT_RULES)) + driver_instance_cache = device_service.device_servicer.driver_instance_cache + driver: _Driver = driver_instance_cache.get(DEVICE_TAPI_UUID) + assert driver is not None + + +def test_device_tapi_get( + context_client: ContextClient, # pylint: disable=redefined-outer-name + device_client: DeviceClient): # pylint: disable=redefined-outer-name + + if not ENABLE_TAPI: pytest.skip('Skipping test: No TAPI device has been configured') + + initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE_TAPI_ID)) + LOGGER.info('initial_config = {:s}'.format(grpc_message_to_json_string(initial_config))) + + device_data = context_client.GetDevice(DeviceId(**DEVICE_TAPI_ID)) + LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data))) + + +def test_device_tapi_configure( + context_client: ContextClient, # pylint: disable=redefined-outer-name + device_client: DeviceClient, # pylint: disable=redefined-outer-name + device_service: DeviceService): # pylint: disable=redefined-outer-name + + if not ENABLE_TAPI: pytest.skip('Skipping test: No TAPI device has been configured') + + driver_instance_cache = device_service.device_servicer.driver_instance_cache + driver : _Driver = driver_instance_cache.get(DEVICE_TAPI_UUID) + assert driver is not None + + # Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly. + #driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + + DEVICE_TAPI_WITH_CONFIG_RULES = copy.deepcopy(DEVICE_TAPI) + DEVICE_TAPI_WITH_CONFIG_RULES['device_config']['config_rules'].extend(DEVICE_TAPI_CONFIG_RULES) + device_client.ConfigureDevice(Device(**DEVICE_TAPI_WITH_CONFIG_RULES)) + + # Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly. + #driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + + device_data = context_client.GetDevice(DeviceId(**DEVICE_TAPI_ID)) + config_rules = [ + (ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value) + for config_rule in device_data.device_config.config_rules + ] + LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( + '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules]))) + for config_rule in DEVICE_TAPI_CONFIG_RULES: + config_rule = ( + ConfigActionEnum.Name(config_rule['action']), config_rule['resource_key'], config_rule['resource_value']) + assert config_rule in config_rules + + +def test_device_tapi_deconfigure( + context_client: ContextClient, # pylint: disable=redefined-outer-name + device_client: DeviceClient, # pylint: disable=redefined-outer-name + device_service: DeviceService): # pylint: disable=redefined-outer-name + + if not ENABLE_TAPI: pytest.skip('Skipping test: No TAPI device has been configured') + + driver_instance_cache = device_service.device_servicer.driver_instance_cache + driver: _Driver = driver_instance_cache.get(DEVICE_TAPI_UUID) + assert driver is not None + + # Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly. + #driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + + DEVICE_TAPI_WITH_DECONFIG_RULES = copy.deepcopy(DEVICE_TAPI) + DEVICE_TAPI_WITH_DECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_TAPI_DECONFIG_RULES) + device_client.ConfigureDevice(Device(**DEVICE_TAPI_WITH_DECONFIG_RULES)) + + # Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly. + #driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0)) + #LOGGER.info('driver_config = {:s}'.format(str(driver_config))) + + device_data = context_client.GetDevice(DeviceId(**DEVICE_TAPI_ID)) + config_rules = [ + (ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value) + for config_rule in device_data.device_config.config_rules + ] + LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( + '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules]))) + for config_rule in DEVICE_TAPI_DECONFIG_RULES: + action_set = ConfigActionEnum.Name(ConfigActionEnum.CONFIGACTION_SET) + config_rule = (action_set, config_rule['resource_key'], config_rule['resource_value']) + assert config_rule not in config_rules + + +def test_device_tapi_delete( + device_client : DeviceClient, # pylint: disable=redefined-outer-name + device_service : DeviceService): # pylint: disable=redefined-outer-name + + if not ENABLE_TAPI: pytest.skip('Skipping test: No TAPI device has been configured') + + device_client.DeleteDevice(DeviceId(**DEVICE_TAPI_ID)) + driver_instance_cache = device_service.device_servicer.driver_instance_cache + driver : _Driver = driver_instance_cache.get(DEVICE_TAPI_UUID, {}) + assert driver is None