Skip to content
Snippets Groups Projects
Commit 0ed8cfee authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Device component:

- Migrated to use new generic gRPC servicer
- Migrated to use new generic Rest servicer
- Migrated to use new settings framework
- Migrated tests to use new generic servicers and mock's
- Solved bug with endpoint monitors' population (dupplicated sample types due to wrong key generation)
- Use Emulated driver when both driver and device type are not specified
- Separated unitary tests and execution scripts per device driver
- Extracted common unitary test functionalities into separate code files
- Minor code styling/formatting
parent 11a69c44
No related branches found
No related tags found
1 merge request!54Release 2.0.0
Showing
with 719 additions and 395 deletions
#!/bin/bash
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
# Run unitary tests and analyze coverage of code at same time
# Useful flags for pytest:
#-o log_cli=true -o log_file=device.log -o log_file_level=DEBUG
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
device/tests/test_unitary_emulated.py
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
device/tests/test_unitary_openconfig.py
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
device/tests/test_unitary_tapi.py
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
device/tests/test_unitary_p4.py
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
device/tests/test_unitary_microwave.py
#!/bin/bash
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
...@@ -12,18 +13,16 @@ ...@@ -12,18 +13,16 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging
from queue import Queue
from monitoring.proto.context_pb2 import Empty
from monitoring.proto.monitoring_pb2 import Kpi
from monitoring.proto.monitoring_pb2_grpc import MonitoringServiceServicer
LOGGER = logging.getLogger(__name__) PROJECTDIR=`pwd`
class MockMonitoringServiceServicerImpl(MonitoringServiceServicer): cd $PROJECTDIR/src
def __init__(self, queue_samples : Queue): RCFILE=$PROJECTDIR/coverage/.coveragerc
self.queue_samples = queue_samples
def IncludeKpi(self, request : Kpi, context) -> Empty: # Run unitary tests and analyze coverage of code at same time
self.queue_samples.put(request)
return Empty() # Useful flags for pytest:
#-o log_cli=true -o log_file=device.log -o log_file_level=DEBUG
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
device/tests/test_unitary_emulated.py
...@@ -24,5 +24,5 @@ RCFILE=$PROJECTDIR/coverage/.coveragerc ...@@ -24,5 +24,5 @@ RCFILE=$PROJECTDIR/coverage/.coveragerc
# Useful flags for pytest: # Useful flags for pytest:
#-o log_cli=true -o log_file=device.log -o log_file_level=DEBUG #-o log_cli=true -o log_file=device.log -o log_file_level=DEBUG
coverage run --rcfile=$RCFILE --append -m pytest -s --log-level=INFO --verbose \ coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
device/tests/test_unitary_microwave.py device/tests/test_unitary_microwave.py
#!/bin/bash
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
# Run unitary tests and analyze coverage of code at same time
# Useful flags for pytest:
#-o log_cli=true -o log_file=device.log -o log_file_level=DEBUG
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
device/tests/test_unitary_openconfig.py
...@@ -25,4 +25,4 @@ RCFILE=$PROJECTDIR/coverage/.coveragerc ...@@ -25,4 +25,4 @@ RCFILE=$PROJECTDIR/coverage/.coveragerc
#-o log_cli=true -o log_file=device.log -o log_file_level=DEBUG #-o log_cli=true -o log_file=device.log -o log_file_level=DEBUG
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
device/tests/test_unitary.py device/tests/test_unitary_p4.py
#!/bin/bash
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
# Run unitary tests and analyze coverage of code at same time
# Useful flags for pytest:
#-o log_cli=true -o log_file=device.log -o log_file_level=DEBUG
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
device/tests/test_unitary_tapi.py
...@@ -12,21 +12,3 @@ ...@@ -12,21 +12,3 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging
# General settings
LOG_LEVEL = logging.WARNING
# gRPC settings
GRPC_SERVICE_PORT = 2020
GRPC_MAX_WORKERS = 10
GRPC_GRACE_PERIOD = 60
# Prometheus settings
METRICS_PORT = 9192
# Dependency micro-service connection settings
CONTEXT_SERVICE_HOST = '127.0.0.1'
CONTEXT_SERVICE_PORT = 1010
MONITORING_SERVICE_HOST = '127.0.0.1'
MONITORING_SERVICE_PORT = 7070
...@@ -13,7 +13,10 @@ ...@@ -13,7 +13,10 @@
# limitations under the License. # limitations under the License.
import grpc, logging import grpc, logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
from device.proto.context_pb2 import Device, DeviceConfig, DeviceId, Empty from device.proto.context_pb2 import Device, DeviceConfig, DeviceId, Empty
from device.proto.device_pb2 import MonitoringSettings from device.proto.device_pb2 import MonitoringSettings
from device.proto.device_pb2_grpc import DeviceServiceStub from device.proto.device_pb2_grpc import DeviceServiceStub
...@@ -24,8 +27,10 @@ DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) ...@@ -24,8 +27,10 @@ DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class DeviceClient: class DeviceClient:
def __init__(self, address, port): def __init__(self, host=None, port=None):
self.endpoint = '{:s}:{:s}'.format(str(address), str(port)) if not host: host = get_service_host(ServiceNameEnum.DEVICE)
if not port: port = get_service_port_grpc(ServiceNameEnum.DEVICE)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint)))
self.channel = None self.channel = None
self.stub = None self.stub = None
...@@ -37,41 +42,41 @@ class DeviceClient: ...@@ -37,41 +42,41 @@ class DeviceClient:
self.stub = DeviceServiceStub(self.channel) self.stub = DeviceServiceStub(self.channel)
def close(self): def close(self):
if(self.channel is not None): self.channel.close() if self.channel is not None: self.channel.close()
self.channel = None self.channel = None
self.stub = None self.stub = None
@RETRY_DECORATOR @RETRY_DECORATOR
def AddDevice(self, request : Device) -> DeviceId: def AddDevice(self, request : Device) -> DeviceId:
LOGGER.debug('AddDevice request: {:s}'.format(str(request))) LOGGER.debug('AddDevice request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.AddDevice(request) response = self.stub.AddDevice(request)
LOGGER.debug('AddDevice result: {:s}'.format(str(response))) LOGGER.debug('AddDevice result: {:s}'.format(grpc_message_to_json_string(response)))
return response return response
@RETRY_DECORATOR @RETRY_DECORATOR
def ConfigureDevice(self, request : Device) -> DeviceId: def ConfigureDevice(self, request : Device) -> DeviceId:
LOGGER.debug('ConfigureDevice request: {:s}'.format(str(request))) LOGGER.debug('ConfigureDevice request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.ConfigureDevice(request) response = self.stub.ConfigureDevice(request)
LOGGER.debug('ConfigureDevice result: {:s}'.format(str(response))) LOGGER.debug('ConfigureDevice result: {:s}'.format(grpc_message_to_json_string(response)))
return response return response
@RETRY_DECORATOR @RETRY_DECORATOR
def DeleteDevice(self, request : DeviceId) -> Empty: def DeleteDevice(self, request : DeviceId) -> Empty:
LOGGER.debug('DeleteDevice request: {:s}'.format(str(request))) LOGGER.debug('DeleteDevice request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.DeleteDevice(request) response = self.stub.DeleteDevice(request)
LOGGER.debug('DeleteDevice result: {:s}'.format(str(response))) LOGGER.debug('DeleteDevice result: {:s}'.format(grpc_message_to_json_string(response)))
return response return response
@RETRY_DECORATOR @RETRY_DECORATOR
def GetInitialConfig(self, request : DeviceId) -> DeviceConfig: def GetInitialConfig(self, request : DeviceId) -> DeviceConfig:
LOGGER.debug('GetInitialConfig request: {:s}'.format(str(request))) LOGGER.debug('GetInitialConfig request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.GetInitialConfig(request) response = self.stub.GetInitialConfig(request)
LOGGER.debug('GetInitialConfig result: {:s}'.format(str(response))) LOGGER.debug('GetInitialConfig result: {:s}'.format(grpc_message_to_json_string(response)))
return response return response
@RETRY_DECORATOR @RETRY_DECORATOR
def MonitorDeviceKpi(self, request : MonitoringSettings) -> Empty: def MonitorDeviceKpi(self, request : MonitoringSettings) -> Empty:
LOGGER.debug('MonitorDeviceKpi request: {:s}'.format(str(request))) LOGGER.debug('MonitorDeviceKpi request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.MonitorDeviceKpi(request) response = self.stub.MonitorDeviceKpi(request)
LOGGER.debug('MonitorDeviceKpi result: {:s}'.format(str(response))) LOGGER.debug('MonitorDeviceKpi result: {:s}'.format(grpc_message_to_json_string(response)))
return response return response
...@@ -12,76 +12,29 @@ ...@@ -12,76 +12,29 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import grpc, logging from common.Constants import ServiceNameEnum
from concurrent import futures from common.Settings import get_service_port_grpc
from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH
from grpc_health.v1.health_pb2 import HealthCheckResponse
from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server
from common.orm.backend.BackendEnum import BackendEnum from common.orm.backend.BackendEnum import BackendEnum
from common.orm.Database import Database from common.orm.Database import Database
from common.orm.Factory import get_database_backend from common.orm.Factory import get_database_backend
from context.client.ContextClient import ContextClient from common.tools.service.GenericGrpcService import GenericGrpcService
from device.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
from device.proto.device_pb2_grpc import add_DeviceServiceServicer_to_server from device.proto.device_pb2_grpc import add_DeviceServiceServicer_to_server
from monitoring.client.monitoring_client import MonitoringClient
from .driver_api.DriverInstanceCache import DriverInstanceCache from .driver_api.DriverInstanceCache import DriverInstanceCache
from .DeviceServiceServicerImpl import DeviceServiceServicerImpl from .DeviceServiceServicerImpl import DeviceServiceServicerImpl
from .MonitoringLoops import MonitoringLoops from .MonitoringLoops import MonitoringLoops
BIND_ADDRESS = '0.0.0.0' class DeviceService(GenericGrpcService):
LOGGER = logging.getLogger(__name__) def __init__(self, driver_instance_cache : DriverInstanceCache, cls_name: str = __name__) -> None:
port = get_service_port_grpc(ServiceNameEnum.DEVICE)
class DeviceService: super().__init__(port, cls_name=cls_name)
def __init__( database = Database(get_database_backend(backend=BackendEnum.INMEMORY))
self, context_client : ContextClient, monitoring_client : MonitoringClient, self.monitoring_loops = MonitoringLoops(database)
driver_instance_cache : DriverInstanceCache, self.device_servicer = DeviceServiceServicerImpl(database, driver_instance_cache, self.monitoring_loops)
address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD):
self.context_client = context_client
self.monitoring_client = monitoring_client
self.driver_instance_cache = driver_instance_cache
self.address = address
self.port = port
self.endpoint = None
self.max_workers = max_workers
self.grace_period = grace_period
self.device_servicer = None
self.health_servicer = None
self.pool = None
self.server = None
self.database = Database(get_database_backend(backend=BackendEnum.INMEMORY))
self.monitoring_loops = MonitoringLoops(monitoring_client, self.database)
def start(self):
self.endpoint = '{:s}:{:s}'.format(str(self.address), str(self.port))
LOGGER.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format(
str(self.endpoint), str(self.max_workers)))
def install_servicers(self):
self.monitoring_loops.start() self.monitoring_loops.start()
self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers)
self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,))
self.device_servicer = DeviceServiceServicerImpl(
self.context_client, self.database, self.driver_instance_cache, self.monitoring_loops)
add_DeviceServiceServicer_to_server(self.device_servicer, self.server) add_DeviceServiceServicer_to_server(self.device_servicer, self.server)
self.health_servicer = HealthServicer(
experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1))
add_HealthServicer_to_server(self.health_servicer, self.server)
port = self.server.add_insecure_port(self.endpoint)
self.endpoint = '{:s}:{:s}'.format(str(self.address), str(port))
LOGGER.info('Listening on {:s}...'.format(str(self.endpoint)))
self.server.start()
self.health_servicer.set(OVERALL_HEALTH, HealthCheckResponse.SERVING) # pylint: disable=maybe-no-member
LOGGER.debug('Service started')
def stop(self): def stop(self):
LOGGER.debug('Stopping service (grace period {:s} seconds)...'.format(str(self.grace_period))) super().stop()
self.health_servicer.enter_graceful_shutdown()
self.server.stop(self.grace_period)
self.monitoring_loops.stop() self.monitoring_loops.stop()
LOGGER.debug('Service stopped')
...@@ -49,11 +49,10 @@ METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) ...@@ -49,11 +49,10 @@ METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
class DeviceServiceServicerImpl(DeviceServiceServicer): class DeviceServiceServicerImpl(DeviceServiceServicer):
def __init__( def __init__(
self, context_client : ContextClient, database : Database, driver_instance_cache : DriverInstanceCache, self, database : Database, driver_instance_cache : DriverInstanceCache, monitoring_loops : MonitoringLoops
monitoring_loops : MonitoringLoops): ) -> None:
LOGGER.debug('Creating Servicer...') LOGGER.debug('Creating Servicer...')
self.context_client = context_client self.context_client = ContextClient()
self.database = database self.database = database
self.driver_instance_cache = driver_instance_cache self.driver_instance_cache = driver_instance_cache
self.monitoring_loops = monitoring_loops self.monitoring_loops = monitoring_loops
......
...@@ -18,7 +18,7 @@ from typing import Dict ...@@ -18,7 +18,7 @@ from typing import Dict
from common.orm.Database import Database from common.orm.Database import Database
from common.orm.HighLevel import get_object from common.orm.HighLevel import get_object
from common.orm.backend.Tools import key_to_str from common.orm.backend.Tools import key_to_str
from monitoring.client.monitoring_client import MonitoringClient from monitoring.client.MonitoringClient import MonitoringClient
from monitoring.proto.monitoring_pb2 import Kpi from monitoring.proto.monitoring_pb2 import Kpi
from .database.KpiModel import KpiModel from .database.KpiModel import KpiModel
from .database.RelationModels import EndPointMonitorKpiModel from .database.RelationModels import EndPointMonitorKpiModel
...@@ -55,8 +55,8 @@ class MonitoringLoop: ...@@ -55,8 +55,8 @@ class MonitoringLoop:
self._collector_thread.join() self._collector_thread.join()
class MonitoringLoops: class MonitoringLoops:
def __init__(self, monitoring_client : MonitoringClient, database : Database) -> None: def __init__(self, database : Database) -> None:
self._monitoring_client = monitoring_client self._monitoring_client = MonitoringClient()
self._database = database self._database = database
self._samples_queue = queue.Queue() self._samples_queue = queue.Queue()
self._running = threading.Event() self._running = threading.Event()
...@@ -82,7 +82,6 @@ class MonitoringLoops: ...@@ -82,7 +82,6 @@ class MonitoringLoops:
def start(self): def start(self):
self._exporter_thread.start() self._exporter_thread.start()
self._running.set()
@property @property
def is_running(self): return self._running.is_set() def is_running(self): return self._running.is_set()
...@@ -96,6 +95,7 @@ class MonitoringLoops: ...@@ -96,6 +95,7 @@ class MonitoringLoops:
LOGGER.error('[MonitoringLoops:_export] Database not set. Terminating Exporter.') LOGGER.error('[MonitoringLoops:_export] Database not set. Terminating Exporter.')
return return
self._running.set()
while not self._terminate.is_set(): while not self._terminate.is_set():
try: try:
sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT) sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT)
...@@ -149,3 +149,5 @@ class MonitoringLoops: ...@@ -149,3 +149,5 @@ class MonitoringLoops:
})) }))
except: # pylint: disable=bare-except except: # pylint: disable=bare-except
LOGGER.exception('Unable to format/send Kpi') LOGGER.exception('Unable to format/send Kpi')
self._running.clear()
...@@ -14,12 +14,10 @@ ...@@ -14,12 +14,10 @@
import logging, signal, sys, threading import logging, signal, sys, threading
from prometheus_client import start_http_server from prometheus_client import start_http_server
from common.Settings import get_setting, wait_for_environment_variables from common.Constants import ServiceNameEnum
from context.client.ContextClient import ContextClient from common.Settings import (
from device.Config import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port,
CONTEXT_SERVICE_HOST, CONTEXT_SERVICE_PORT, GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, wait_for_environment_variables)
METRICS_PORT, MONITORING_SERVICE_HOST, MONITORING_SERVICE_PORT)
from monitoring.client.monitoring_client import MonitoringClient
from .DeviceService import DeviceService from .DeviceService import DeviceService
from .driver_api.DriverFactory import DriverFactory from .driver_api.DriverFactory import DriverFactory
from .driver_api.DriverInstanceCache import DriverInstanceCache from .driver_api.DriverInstanceCache import DriverInstanceCache
...@@ -35,12 +33,7 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name ...@@ -35,12 +33,7 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
def main(): def main():
global LOGGER # pylint: disable=global-statement global LOGGER # pylint: disable=global-statement
grpc_service_port = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT ) log_level = get_log_level()
max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS )
grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD )
log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL )
metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT )
logging.basicConfig(level=log_level) logging.basicConfig(level=log_level)
logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING) logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)
logging.getLogger('apscheduler.scheduler').setLevel(logging.WARNING) logging.getLogger('apscheduler.scheduler').setLevel(logging.WARNING)
...@@ -48,43 +41,25 @@ def main(): ...@@ -48,43 +41,25 @@ def main():
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
wait_for_environment_variables([ wait_for_environment_variables([
'CONTEXTSERVICE_SERVICE_HOST', 'CONTEXTSERVICE_SERVICE_PORT_GRPC', get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
'MONITORINGSERVICE_SERVICE_HOST', 'MONITORINGSERVICE_SERVICE_PORT_GRPC' get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
]) ])
context_service_host = get_setting('CONTEXTSERVICE_SERVICE_HOST', default=CONTEXT_SERVICE_HOST )
context_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC', default=CONTEXT_SERVICE_PORT )
monitoring_service_host = get_setting('MONITORINGSERVICE_SERVICE_HOST', default=MONITORING_SERVICE_HOST)
monitoring_service_port = get_setting('MONITORINGSERVICE_SERVICE_PORT_GRPC', default=MONITORING_SERVICE_PORT)
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info('Starting...') LOGGER.info('Starting...')
# Start metrics server # Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port) start_http_server(metrics_port)
# Initialize Context Client
if context_service_host is None or context_service_port is None:
raise Exception('Wrong address({:s}):port({:s}) of Context component'.format(
str(context_service_host), str(context_service_port)))
context_client = ContextClient(context_service_host, context_service_port)
# Initialize Monitoring Client
if monitoring_service_host is None or monitoring_service_port is None:
raise Exception('Wrong address({:s}):port({:s}) of Monitoring component'.format(
str(monitoring_service_host), str(monitoring_service_port)))
monitoring_client = MonitoringClient(monitoring_service_host, monitoring_service_port)
# Initialize Driver framework # Initialize Driver framework
driver_factory = DriverFactory(DRIVERS) driver_factory = DriverFactory(DRIVERS)
driver_instance_cache = DriverInstanceCache(driver_factory) driver_instance_cache = DriverInstanceCache(driver_factory)
# Starting device service # Starting device service
grpc_service = DeviceService( grpc_service = DeviceService(driver_instance_cache)
context_client, monitoring_client, driver_instance_cache, port=grpc_service_port, max_workers=max_workers,
grace_period=grace_period)
grpc_service.start() grpc_service.start()
# Wait for Ctrl+C or termination signal # Wait for Ctrl+C or termination signal
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
import logging import logging
from typing import Dict, List from typing import Dict, List
from common.orm.Database import Database from common.orm.Database import Database
from common.orm.HighLevel import update_or_create_object
from common.orm.backend.Tools import key_to_str from common.orm.backend.Tools import key_to_str
from common.orm.fields.EnumeratedField import EnumeratedField from common.orm.fields.EnumeratedField import EnumeratedField
from common.orm.fields.ForeignKeyField import ForeignKeyField from common.orm.fields.ForeignKeyField import ForeignKeyField
...@@ -72,9 +73,14 @@ def set_endpoint_monitors(database : Database, db_endpoint : EndPointModel, grpc ...@@ -72,9 +73,14 @@ def set_endpoint_monitors(database : Database, db_endpoint : EndPointModel, grpc
db_endpoint_pk = db_endpoint.pk db_endpoint_pk = db_endpoint.pk
for kpi_sample_type in grpc_endpoint_kpi_sample_types: for kpi_sample_type in grpc_endpoint_kpi_sample_types:
orm_kpi_sample_type = grpc_to_enum__kpi_sample_type(kpi_sample_type) orm_kpi_sample_type = grpc_to_enum__kpi_sample_type(kpi_sample_type)
str_endpoint_kpi_sample_type_key = key_to_str([db_endpoint_pk, orm_kpi_sample_type.name]) str_endpoint_kpi_sample_type_key = key_to_str([db_endpoint_pk, str(orm_kpi_sample_type.value)])
db_endpoint_kpi_sample_type = EndPointMonitorModel(database, str_endpoint_kpi_sample_type_key) #db_endpoint_kpi_sample_type = EndPointMonitorModel(database, str_endpoint_kpi_sample_type_key)
db_endpoint_kpi_sample_type.endpoint_fk = db_endpoint #db_endpoint_kpi_sample_type.endpoint_fk = db_endpoint
db_endpoint_kpi_sample_type.resource_key = '' # during initialization, allow empty value #db_endpoint_kpi_sample_type.resource_key = '' # during initialization, allow empty value
db_endpoint_kpi_sample_type.kpi_sample_type = orm_kpi_sample_type #db_endpoint_kpi_sample_type.kpi_sample_type = orm_kpi_sample_type
db_endpoint_kpi_sample_type.save() #db_endpoint_kpi_sample_type.save()
update_or_create_object(database, EndPointMonitorModel, str_endpoint_kpi_sample_type_key, {
'endpoint_fk' : db_endpoint,
#'resource_key' : '', # during initialization, allow empty value
'kpi_sample_type': orm_kpi_sample_type,
})
...@@ -76,6 +76,7 @@ class DriverFactory: ...@@ -76,6 +76,7 @@ class DriverFactory:
field_candidate_driver_classes = field_candidate_driver_classes.union(field_indice_drivers) field_candidate_driver_classes = field_candidate_driver_classes.union(field_indice_drivers)
if candidate_driver_classes is None: if candidate_driver_classes is None:
if len(field_candidate_driver_classes) == 0: continue
candidate_driver_classes = {k:1 for k in field_candidate_driver_classes} candidate_driver_classes = {k:1 for k in field_candidate_driver_classes}
else: else:
for candidate_driver_class in candidate_driver_classes: for candidate_driver_class in candidate_driver_classes:
......
...@@ -23,6 +23,11 @@ from .microwave.IETFApiDriver import IETFApiDriver ...@@ -23,6 +23,11 @@ from .microwave.IETFApiDriver import IETFApiDriver
DRIVERS = [ DRIVERS = [
(EmulatedDriver, [ (EmulatedDriver, [
{ {
# Driver==unspecified & no device type specified => use Emulated
FilterFieldEnum.DRIVER: ORM_DeviceDriverEnum.UNDEFINED,
},
{
# Emulated OLS/Packet Router, specifying Undefined/OpenConfig/TAPI Driver => use EmulatedDriver
FilterFieldEnum.DEVICE_TYPE: [ FilterFieldEnum.DEVICE_TYPE: [
DeviceTypeEnum.EMULATED_OPTICAL_LINE_SYSTEM, DeviceTypeEnum.EMULATED_OPTICAL_LINE_SYSTEM,
DeviceTypeEnum.EMULATED_PACKET_ROUTER, DeviceTypeEnum.EMULATED_PACKET_ROUTER,
...@@ -36,18 +41,21 @@ DRIVERS = [ ...@@ -36,18 +41,21 @@ DRIVERS = [
]), ]),
(OpenConfigDriver, [ (OpenConfigDriver, [
{ {
# Real Packet Router, specifying OpenConfig Driver => use OpenConfigDriver
FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.PACKET_ROUTER, FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.PACKET_ROUTER,
FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.OPENCONFIG, FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.OPENCONFIG,
} }
]), ]),
(TransportApiDriver, [ (TransportApiDriver, [
{ {
# Real OLS, specifying TAPI Driver => use TransportApiDriver
FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.OPTICAL_LINE_SYSTEM, FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.OPTICAL_LINE_SYSTEM,
FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.TRANSPORT_API, FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.TRANSPORT_API,
} }
]), ]),
(P4Driver, [ (P4Driver, [
{ {
# Real P4 Switch, specifying P4 Driver => use P4Driver
FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.P4_SWITCH, FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.P4_SWITCH,
FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.P4, FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.P4,
} }
......
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
from concurrent import futures
from queue import Queue
from monitoring.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
from monitoring.proto.monitoring_pb2_grpc import add_MonitoringServiceServicer_to_server
from .MockMonitoringServiceServicerImpl import MockMonitoringServiceServicerImpl
BIND_ADDRESS = '0.0.0.0'
LOGGER = logging.getLogger(__name__)
class MockMonitoringService:
def __init__(
self, address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS,
grace_period=GRPC_GRACE_PERIOD):
self.queue_samples = Queue()
self.address = address
self.port = port
self.endpoint = None
self.max_workers = max_workers
self.grace_period = grace_period
self.monitoring_servicer = None
self.pool = None
self.server = None
def start(self):
self.endpoint = '{:s}:{:s}'.format(str(self.address), str(self.port))
LOGGER.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format(
str(self.endpoint), str(self.max_workers)))
self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers)
self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,))
self.monitoring_servicer = MockMonitoringServiceServicerImpl(self.queue_samples)
add_MonitoringServiceServicer_to_server(self.monitoring_servicer, self.server)
port = self.server.add_insecure_port(self.endpoint)
self.endpoint = '{:s}:{:s}'.format(str(self.address), str(port))
LOGGER.info('Listening on {:s}...'.format(str(self.endpoint)))
self.server.start()
LOGGER.debug('Service started')
def stop(self):
LOGGER.debug('Stopping service (grace period {:s} seconds)...'.format(str(self.grace_period)))
self.server.stop(self.grace_period)
LOGGER.debug('Service stopped')
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os, queue
from typing import Union
from common.Constants import ServiceNameEnum
from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name
from common.tests.MockServicerImpl_Context import MockServicerImpl_Context
from common.tests.MockServicerImpl_Monitoring import MockServicerImpl_Monitoring
from common.tools.service.GenericGrpcService import GenericGrpcService
from context.proto.context_pb2_grpc import add_ContextServiceServicer_to_server
from monitoring.proto.monitoring_pb2_grpc import add_MonitoringServiceServicer_to_server
LOCAL_HOST = '127.0.0.1'
SERVICE_CONTEXT = ServiceNameEnum.CONTEXT
SERVICE_MONITORING = ServiceNameEnum.MONITORING
class MockService_Dependencies(GenericGrpcService):
# Mock Service implementing Context and Monitoring to simplify unitary tests of Device
def __init__(self, bind_port: Union[str, int]) -> None:
super().__init__(bind_port, LOCAL_HOST, enable_health_servicer=False, cls_name='MockService')
# pylint: disable=attribute-defined-outside-init
def install_servicers(self):
self.context_servicer = MockServicerImpl_Context()
add_ContextServiceServicer_to_server(self.context_servicer, self.server)
self.queue_samples = queue.Queue()
self.monitoring_servicer = MockServicerImpl_Monitoring(queue_samples=self.queue_samples)
add_MonitoringServiceServicer_to_server(self.monitoring_servicer, self.server)
def configure_env_vars(self):
os.environ[get_env_var_name(SERVICE_CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(self.bind_address)
os.environ[get_env_var_name(SERVICE_CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(self.bind_port)
os.environ[get_env_var_name(SERVICE_MONITORING, ENVVAR_SUFIX_SERVICE_HOST )] = str(self.bind_address)
os.environ[get_env_var_name(SERVICE_MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(self.bind_port)
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest, os
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
from context.client.ContextClient import ContextClient
from context.proto.context_pb2 import Context, Topology
from device.client.DeviceClient import DeviceClient
from device.service.DeviceService import DeviceService
from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
from device.service.drivers import DRIVERS
from device.tests.CommonObjects import CONTEXT, TOPOLOGY
from device.tests.MockService_Dependencies import MockService_Dependencies
from monitoring.client.MonitoringClient import MonitoringClient
LOCAL_HOST = '127.0.0.1'
MOCKSERVICE_PORT = 10000
DEVICE_SERVICE_PORT = MOCKSERVICE_PORT + get_service_port_grpc(ServiceNameEnum.DEVICE) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(DEVICE_SERVICE_PORT)
@pytest.fixture(scope='session')
def mock_service():
_service = MockService_Dependencies(MOCKSERVICE_PORT)
_service.configure_env_vars()
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def context_client(mock_service : MockService_Dependencies): # pylint: disable=redefined-outer-name
_client = ContextClient()
yield _client
_client.close()
@pytest.fixture(scope='session')
def monitoring_client(mock_service : MockService_Dependencies): # pylint: disable=redefined-outer-name
_client = MonitoringClient()
yield _client
_client.close()
@pytest.fixture(scope='session')
def device_service(
context_client : ContextClient, # pylint: disable=redefined-outer-name
monitoring_client : MonitoringClient): # pylint: disable=redefined-outer-name
_driver_factory = DriverFactory(DRIVERS)
_driver_instance_cache = DriverInstanceCache(_driver_factory)
_service = DeviceService(_driver_instance_cache)
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name
_client = DeviceClient()
yield _client
_client.close()
def test_prepare_environment(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
device_service : DeviceService): # pylint: disable=redefined-outer-name
context_client.SetContext(Context(**CONTEXT))
context_client.SetTopology(Topology(**TOPOLOGY))
...@@ -12,211 +12,61 @@ ...@@ -12,211 +12,61 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import calendar, copy, dateutil.parser, grpc, json, logging, operator, os, pytest, queue, time import copy, grpc, logging, pytest
from datetime import datetime, timezone
from typing import Tuple
from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.EndPoint import json_endpoint, json_endpoint_id
from context.Config import (
GRPC_SERVICE_PORT as CONTEXT_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as CONTEXT_GRPC_MAX_WORKERS,
GRPC_GRACE_PERIOD as CONTEXT_GRPC_GRACE_PERIOD)
from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient
from context.proto.context_pb2 import DeviceId, DeviceOperationalStatusEnum from context.proto.context_pb2 import DeviceId
from context.service.grpc_server.ContextService import ContextService
from device.Config import (
GRPC_SERVICE_PORT as DEVICE_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as DEVICE_GRPC_MAX_WORKERS,
GRPC_GRACE_PERIOD as DEVICE_GRPC_GRACE_PERIOD)
from device.client.DeviceClient import DeviceClient from device.client.DeviceClient import DeviceClient
from device.proto.context_pb2 import ConfigActionEnum, Context, Device, Topology from device.proto.context_pb2 import ConfigActionEnum, Device
from device.proto.device_pb2 import MonitoringSettings
from device.proto.kpi_sample_types_pb2 import KpiSampleType
from device.service.DeviceService import DeviceService from device.service.DeviceService import DeviceService
from device.service.driver_api._Driver import _Driver from device.service.driver_api._Driver import _Driver
from device.service.driver_api.DriverFactory import DriverFactory from .PrepareTestScenario import ( # pylint: disable=unused-import
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache # be careful, order of symbols is important here!
from device.service.drivers import DRIVERS mock_service, device_service, context_client, device_client, monitoring_client, test_prepare_environment)
from device.tests.MockMonitoringService import MockMonitoringService
from monitoring.Config import (
GRPC_SERVICE_PORT as MONITORING_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as MONITORING_GRPC_MAX_WORKERS,
GRPC_GRACE_PERIOD as MONITORING_GRPC_GRACE_PERIOD)
from monitoring.client.monitoring_client import MonitoringClient
from .CommonObjects import CONTEXT, TOPOLOGY
from .Device_Emulated import (
DEVICE_EMU, DEVICE_EMU_CONFIG_ADDRESSES, DEVICE_EMU_CONFIG_ENDPOINTS, DEVICE_EMU_CONNECT_RULES,
DEVICE_EMU_DECONFIG_ADDRESSES, DEVICE_EMU_DECONFIG_ENDPOINTS, DEVICE_EMU_EP_DESCS, DEVICE_EMU_ENDPOINTS_COOKED,
DEVICE_EMU_ID, DEVICE_EMU_RECONFIG_ADDRESSES, DEVICE_EMU_UUID)
ENABLE_EMULATED = True
try:
from .Device_OpenConfig_Infinera1 import(
#from .Device_OpenConfig_Infinera2 import(
DEVICE_OC, DEVICE_OC_CONFIG_RULES, DEVICE_OC_DECONFIG_RULES, DEVICE_OC_CONNECT_RULES, DEVICE_OC_ID,
DEVICE_OC_UUID)
ENABLE_OPENCONFIG = True
except ImportError:
ENABLE_OPENCONFIG = False
try:
from .Device_Transport_Api_CTTC import (
DEVICE_TAPI, DEVICE_TAPI_CONNECT_RULES, DEVICE_TAPI_UUID, DEVICE_TAPI_ID, DEVICE_TAPI_CONFIG_RULES,
DEVICE_TAPI_DECONFIG_RULES)
ENABLE_TAPI = True
except ImportError:
ENABLE_TAPI = False
try: try:
from .Device_Microwave_Template import ( from .Device_Microwave_Template import (
DEVICE_MICROWAVE, DEVICE_MICROWAVE_CONNECT_RULES, DEVICE_MICROWAVE_UUID, DEVICE_MICROWAVE_ID, DEVICE_MICROWAVE_CONFIG_RULES, DEVICE_MICROWAVE, DEVICE_MICROWAVE_CONNECT_RULES, DEVICE_MICROWAVE_UUID, DEVICE_MICROWAVE_ID,
DEVICE_MICROWAVE_DECONFIG_RULES) DEVICE_MICROWAVE_CONFIG_RULES, DEVICE_MICROWAVE_DECONFIG_RULES)
ENABLE_MICROWAVE = True ENABLE_MICROWAVE = True
except ImportError as error:
ENABLE_MICROWAVE = False
print(error.__class__.__name__ + ": " + error.message)
from .mock_p4runtime_service import MockP4RuntimeService
try:
from .device_p4 import(
DEVICE_P4, DEVICE_P4_ID, DEVICE_P4_UUID, DEVICE_P4_NAME, DEVICE_P4_ADDRESS, DEVICE_P4_PORT, DEVICE_P4_WORKERS,
DEVICE_P4_GRACE_PERIOD, DEVICE_P4_CONNECT_RULES, DEVICE_P4_CONFIG_RULES)
ENABLE_P4 = True
except ImportError: except ImportError:
ENABLE_P4 = False ENABLE_MICROWAVE = False
#ENABLE_EMULATED = False # set to False to disable tests of Emulated devices
#ENABLE_OPENCONFIG = False # set to False to disable tests of OpenConfig devices
#ENABLE_TAPI = False # set to False to disable tests of TAPI devices
#ENABLE_P4 = False # set to False to disable tests of P4 devices
ENABLE_OPENCONFIG_CONFIGURE = True
ENABLE_OPENCONFIG_MONITOR = True
ENABLE_OPENCONFIG_DECONFIGURE = True
logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)
logging.getLogger('apscheduler.scheduler').setLevel(logging.WARNING)
logging.getLogger('monitoring-client').setLevel(logging.WARNING)
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG) LOGGER.setLevel(logging.DEBUG)
CONTEXT_GRPC_SERVICE_PORT = 10000 + CONTEXT_GRPC_SERVICE_PORT # avoid privileged ports
DEVICE_GRPC_SERVICE_PORT = 10000 + DEVICE_GRPC_SERVICE_PORT # avoid privileged ports
MONITORING_GRPC_SERVICE_PORT = 10000 + MONITORING_GRPC_SERVICE_PORT # avoid privileged ports
DEFAULT_REDIS_SERVICE_HOST = '127.0.0.1'
DEFAULT_REDIS_SERVICE_PORT = 6379
DEFAULT_REDIS_DATABASE_ID = 0
REDIS_CONFIG = {
'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST),
'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT),
'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID', DEFAULT_REDIS_DATABASE_ID ),
}
SCENARIOS = [
('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ),
#('all_redis', DatabaseBackendEnum.REDIS, REDIS_CONFIG, MessageBrokerBackendEnum.REDIS, REDIS_CONFIG),
]
@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS)
def context_db_mb(request) -> Tuple[Database, MessageBroker]:
name,db_backend,db_settings,mb_backend,mb_settings = request.param
msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...'
LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings)))
_database = Database(get_database_backend(backend=db_backend, **db_settings))
_message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings))
yield _database, _message_broker
_message_broker.terminate()
@pytest.fixture(scope='session')
def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
_service = ContextService(
context_db_mb[0], context_db_mb[1], port=CONTEXT_GRPC_SERVICE_PORT, max_workers=CONTEXT_GRPC_MAX_WORKERS,
grace_period=CONTEXT_GRPC_GRACE_PERIOD)
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name
_client = ContextClient(address='127.0.0.1', port=CONTEXT_GRPC_SERVICE_PORT)
yield _client
_client.close()
@pytest.fixture(scope='session')
def monitoring_service():
_service = MockMonitoringService(port=MONITORING_GRPC_SERVICE_PORT, max_workers=MONITORING_GRPC_MAX_WORKERS,
grace_period=MONITORING_GRPC_GRACE_PERIOD)
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def monitoring_client(monitoring_service : MockMonitoringService): # pylint: disable=redefined-outer-name
_client = MonitoringClient(server='127.0.0.1', port=MONITORING_GRPC_SERVICE_PORT)
#yield _client
#_client.close()
return _client
@pytest.fixture(scope='session')
def device_service(
context_client : ContextClient, # pylint: disable=redefined-outer-name
monitoring_client : MonitoringClient): # pylint: disable=redefined-outer-name
_driver_factory = DriverFactory(DRIVERS)
_driver_instance_cache = DriverInstanceCache(_driver_factory)
_service = DeviceService(
context_client, monitoring_client, _driver_instance_cache, port=DEVICE_GRPC_SERVICE_PORT,
max_workers=DEVICE_GRPC_MAX_WORKERS, grace_period=DEVICE_GRPC_GRACE_PERIOD)
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name
_client = DeviceClient(address='127.0.0.1', port=DEVICE_GRPC_SERVICE_PORT)
yield _client
_client.close()
@pytest.fixture(scope='session')
def p4runtime_service():
_service = MockP4RuntimeService(
address=DEVICE_P4_ADDRESS, port=DEVICE_P4_PORT,
max_workers=DEVICE_P4_WORKERS,
grace_period=DEVICE_P4_GRACE_PERIOD)
_service.start()
yield _service
_service.stop()
def test_prepare_environment(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
device_service : DeviceService): # pylint: disable=redefined-outer-name
context_client.SetContext(Context(**CONTEXT)) # ----- Test Device Driver Microwave ------------------------------------------------
context_client.SetTopology(Topology(**TOPOLOGY))
def test_device_microwave_add_error_cases(
device_client : DeviceClient): # pylint: disable=redefined-outer-name
if not ENABLE_MICROWAVE: pytest.skip('Skipping test: No TAPI device has been configured')
with pytest.raises(grpc.RpcError) as e:
DEVICE_MICROWAVE_WITH_EXTRA_RULES = copy.deepcopy(DEVICE_MICROWAVE)
DEVICE_MICROWAVE_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_MICROWAVE_CONNECT_RULES)
DEVICE_MICROWAVE_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_MICROWAVE_CONFIG_RULES)
device_client.AddDevice(Device(**DEVICE_MICROWAVE_WITH_EXTRA_RULES))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg_head = 'device.device_config.config_rules(['
msg_tail = ']) is invalid; RPC method AddDevice only accepts connection Config Rules that should start '\
'with "_connect/" tag. Others should be configured after adding the device.'
except_msg = str(e.value.details())
assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail)
# ----- Test Device Driver Microwave ------------------------------------------------
def test_device_microwave_add_correct( def test_device_microwave_add_correct(
device_client: DeviceClient, # pylint: disable=redefined-outer-name device_client: DeviceClient, # pylint: disable=redefined-outer-name
device_service: DeviceService): # pylint: disable=redefined-outer-name device_service: DeviceService): # pylint: disable=redefined-outer-name
if not ENABLE_MICROWAVE: pytest.skip('Skipping test: No MICROWAVE device has been configured') if not ENABLE_MICROWAVE: pytest.skip('Skipping test: No MICROWAVE device has been configured')
DEVICE_MICROWAVE_WITH_CONNECT_RULES = copy.deepcopy(DEVICE_MICROWAVE) DEVICE_MICROWAVE_WITH_CONNECT_RULES = copy.deepcopy(DEVICE_MICROWAVE)
DEVICE_MICROWAVE_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_MICROWAVE_CONNECT_RULES) DEVICE_MICROWAVE_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_MICROWAVE_CONNECT_RULES)
device_client.AddDevice(Device(**DEVICE_MICROWAVE_WITH_CONNECT_RULES)) device_client.AddDevice(Device(**DEVICE_MICROWAVE_WITH_CONNECT_RULES))
driver: _Driver = device_service.driver_instance_cache.get(DEVICE_MICROWAVE_UUID) driver_instance_cache = device_service.device_servicer.driver_instance_cache
driver: _Driver = driver_instance_cache.get(DEVICE_MICROWAVE_UUID)
assert driver is not None assert driver is not None
...@@ -240,7 +90,8 @@ def test_device_microwave_configure( ...@@ -240,7 +90,8 @@ def test_device_microwave_configure(
if not ENABLE_MICROWAVE: pytest.skip('Skipping test: No MICROWAVE device has been configured') if not ENABLE_MICROWAVE: pytest.skip('Skipping test: No MICROWAVE device has been configured')
driver : _Driver = device_service.driver_instance_cache.get(DEVICE_MICROWAVE_UUID) driver_instance_cache = device_service.device_servicer.driver_instance_cache
driver : _Driver = driver_instance_cache.get(DEVICE_MICROWAVE_UUID)
assert driver is not None assert driver is not None
# Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly. # Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly.
...@@ -263,8 +114,6 @@ def test_device_microwave_configure( ...@@ -263,8 +114,6 @@ def test_device_microwave_configure(
LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format( LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format(
'\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules]))) '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules])))
for config_rule in DEVICE_MICROWAVE_CONFIG_RULES: for config_rule in DEVICE_MICROWAVE_CONFIG_RULES:
#import pdb;
#pdb. set_trace()
config_rule = ( config_rule = (
ConfigActionEnum.Name(config_rule['action']), config_rule['resource_key'], config_rule['resource_value']) ConfigActionEnum.Name(config_rule['action']), config_rule['resource_key'], config_rule['resource_value'])
assert config_rule in config_rules assert config_rule in config_rules
...@@ -277,7 +126,8 @@ def test_device_microwave_deconfigure( ...@@ -277,7 +126,8 @@ def test_device_microwave_deconfigure(
if not ENABLE_MICROWAVE: pytest.skip('Skipping test: No MICROWAVE device has been configured') if not ENABLE_MICROWAVE: pytest.skip('Skipping test: No MICROWAVE device has been configured')
driver: _Driver = device_service.driver_instance_cache.get(DEVICE_MICROWAVE_UUID) driver_instance_cache = device_service.device_servicer.driver_instance_cache
driver: _Driver = driver_instance_cache.get(DEVICE_MICROWAVE_UUID)
assert driver is not None assert driver is not None
# Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly. # Requires to retrieve data from device; might be slow. Uncomment only when needed and test does not pass directly.
...@@ -312,5 +162,6 @@ def test_device_microwave_delete( ...@@ -312,5 +162,6 @@ def test_device_microwave_delete(
if not ENABLE_MICROWAVE: pytest.skip('Skipping test: No MICROWAVE device has been configured') if not ENABLE_MICROWAVE: pytest.skip('Skipping test: No MICROWAVE device has been configured')
device_client.DeleteDevice(DeviceId(**DEVICE_MICROWAVE_ID)) device_client.DeleteDevice(DeviceId(**DEVICE_MICROWAVE_ID))
driver : _Driver = device_service.driver_instance_cache.get(DEVICE_MICROWAVE_UUID, {}) driver_instance_cache = device_service.device_servicer.driver_instance_cache
driver : _Driver = driver_instance_cache.get(DEVICE_MICROWAVE_UUID, {})
assert driver is None assert driver is None
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment