Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Javi Moreno
committed
from typing import Tuple
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
Javi Moreno
committed
from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
from common.proto.monitoring_pb2 import KpiId, KpiDescriptor
Javi Moreno
committed
from context.client.ContextClient import ContextClient
from context.service.grpc_server.ContextService import ContextService
from common.proto.context_pb2 import EventTypeEnum, DeviceEvent, Device, Empty
from device.client.DeviceClient import DeviceClient
from device.service.DeviceService import DeviceService
from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE'
from device.service.drivers import DRIVERS # pylint: disable=wrong-import-position
# pylint: disable=wrong-import-position
from monitoring.client.MonitoringClient import MonitoringClient
from common.proto import context_pb2, monitoring_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from monitoring.service import SqliteTools, MetricsDBTools
from monitoring.service.MonitoringService import MonitoringService
from monitoring.service.EventTools import EventsDeviceCollector
from monitoring.tests.Messages import create_kpi_request, include_kpi_request, kpi, kpi_id, monitor_kpi_request
from monitoring.tests.Objects import DEVICE_DEV1, DEVICE_DEV1_CONNECT_RULES, DEVICE_DEV1_UUID
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
###########################
# Tests Setup
###########################
Javi Moreno
committed
CONTEXT_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.CONTEXT) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(CONTEXT_SERVICE_PORT)
DEVICE_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.DEVICE) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(DEVICE_SERVICE_PORT)
Javi Moreno
committed
MONITORING_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.MONITORING) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(MONITORING_SERVICE_PORT)
Javi Moreno
committed
METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME")
METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT")
METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE")
@pytest.fixture(scope='session')
def context_db_mb() -> Tuple[Database, MessageBroker]:
_database = Database(get_database_backend(backend=DatabaseBackendEnum.INMEMORY))
_message_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY))
Javi Moreno
committed
yield _database, _message_broker
_message_broker.terminate()
@pytest.fixture(scope='session')
def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
database, message_broker = context_db_mb
Javi Moreno
committed
database.clear_all()
_service = ContextService(database, message_broker)
Javi Moreno
committed
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name
_client = ContextClient()
Javi Moreno
committed
yield _client
_client.close()
@pytest.fixture(scope='session')
def device_service(context_service : ContextService): # pylint: disable=redefined-outer-name
LOGGER.info('Initializing DeviceService...')
driver_factory = DriverFactory(DRIVERS)
driver_instance_cache = DriverInstanceCache(driver_factory)
_service = DeviceService(driver_instance_cache)
_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding DeviceService...')
yield _service
LOGGER.info('Terminating DeviceService...')
_service.stop()
@pytest.fixture(scope='session')
def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name
_client = DeviceClient()
yield _client
_client.close()
# This fixture will be requested by test cases and last during testing session
@pytest.fixture(scope='session')
def monitoring_service(
context_service : ContextService, # pylint: disable=redefined-outer-name
device_service : DeviceService # pylint: disable=redefined-outer-name
):
LOGGER.info('Initializing MonitoringService...')
_service.start()
# yield the server, when test finishes, execution will resume to stop it
yield _service
LOGGER.info('Terminating MonitoringService...')
_service.stop()
# This fixture will be requested by test cases and last during testing session.
# The client requires the server, so client fixture has the server as dependency.
@pytest.fixture(scope='session')
def monitoring_client(monitoring_service : MonitoringService): # pylint: disable=redefined-outer-name
LOGGER.info('Initializing MonitoringClient...')
_client = MonitoringClient()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding MonitoringClient...')
yield _client
Javi Moreno
committed
LOGGER.info('Closing MonitoringClient...')
_client.close()
Javi Moreno
committed
_sql_db = SqliteTools.SQLite('monitoring.db')
return _sql_db
def metrics_db():
_metrics_db = MetricsDBTools.MetricsDB(
METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE)
return _metrics_db
###########################
# Tests Implementation
###########################
# Test case that makes use of client fixture to test server's CreateKpi method
def test_set_kpi(monitoring_client): # pylint: disable=redefined-outer-name
# make call to server
LOGGER.warning('test_create_kpi requesting')
response = monitoring_client.SetKpi(create_kpi_request())
LOGGER.debug(str(response))
assert isinstance(response, KpiId)
# Test case that makes use of client fixture to test server's MonitorKpi method
def test_monitor_kpi(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name
):
# ----- Clean the database -----------------------------------------------------------------------------------------
context_database.clear_all()
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
# ----- Update the object ------------------------------------------------------------------------------------------
LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
response = device_client.AddDevice(Device(**device_with_connect_rules))
assert response.device_uuid.uuid == DEVICE_DEV1_UUID
response = monitoring_client.SetKpi(create_kpi_request())
_monitor_kpi_request = monitor_kpi_request(response.kpi_id.uuid, 120, 5) # pylint: disable=maybe-no-member
response = monitoring_client.MonitorKpi(_monitor_kpi_request)
assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's IncludeKpi method
def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name
# make call to server
LOGGER.warning('test_include_kpi requesting')
response = monitoring_client.IncludeKpi(include_kpi_request())
LOGGER.debug(str(response))
assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's GetStreamKpi method
def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-name
response = monitoring_client.GetStreamKpi(kpi())
#assert isinstance(response, Kpi)
# Test case that makes use of client fixture to test server's GetInstantKpi method
# def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name
# LOGGER.warning('test_getinstant_kpi begin')
# response = monitoring_client.GetInstantKpi(kpi_id())
# LOGGER.debug(str(response))
# # assert isinstance(response, Kpi)
# Test case that makes use of client fixture to test server's GetInstantKpi method
def test_get_kpidescritor_kpi(monitoring_client): # pylint: disable=redefined-outer-name
response = monitoring_client.SetKpi(create_kpi_request())
response = monitoring_client.GetKpiDescriptor(response)
assert isinstance(response, KpiDescriptor)
Javi Moreno
committed
def test_sqlitedb_tools_insert_kpi(sql_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_sqlitedb_tools_insert_kpi begin')
_create_kpi_request = create_kpi_request()
kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member
kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member
kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member
kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member
response = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
assert isinstance(response, int)
def test_sqlitedb_tools_get_kpi(sql_db): # pylint: disable=redefined-outer-name
_create_kpi_request = create_kpi_request()
kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member
kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member
kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member
kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member
_kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
response = sql_db.get_KPI(_kpi_id)
def test_sqlitedb_tools_get_kpis(sql_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_sqlitedb_tools_get_kpis begin')
response = sql_db.get_KPIS()
assert isinstance(response, list)
def test_sqlitedb_tools_delete_kpi(sql_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_sqlitedb_tools_get_kpi begin')
response = sql_db.delete_KPI("DEV1",KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED)
if not response:
_create_kpi_request = create_kpi_request()
kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member
kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member
kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member
kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member
sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
response = sql_db.delete_KPI("DEV1", KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED)
def test_sqlitedb_tools_delete_kpid_id(sql_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_sqlitedb_tools_delete_kpid_id begin')
response = sql_db.delete_kpid_id(1)
if not response:
_create_kpi_request = create_kpi_request()
kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member
kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member
kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member
kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member
_kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
response = sql_db.delete_kpid_id(_kpi_id)
def test_metrics_db_tools_write_kpi(metrics_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_metric_sdb_tools_write_kpi begin')
def test_metrics_db_tools_read_kpi_points(metrics_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_metrics_db_tools_read_kpi_points begin')
def test_events_tools(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name
):
LOGGER.warning('test_get_device_events begin')
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
context_database.clear_all()
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
# ----- Update the object ------------------------------------------------------------------------------------------
LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
response = device_client.AddDevice(Device(**device_with_connect_rules))
assert response.device_uuid.uuid == DEVICE_DEV1_UUID
def test_get_device_events(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name
):
Javi Moreno
committed
Javi Moreno
committed
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
context_database.clear_all()
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
Javi Moreno
committed
events_collector.start()
# ----- Dump state of database before create the object ------------------------------------------------------------
Javi Moreno
committed
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
# ----- Check create event -----------------------------------------------------------------------------------------
LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
response = device_client.AddDevice(Device(**device_with_connect_rules))
assert response.device_uuid.uuid == DEVICE_DEV1_UUID
Javi Moreno
committed
event = events_collector.get_event(block=True)
Javi Moreno
committed
assert isinstance(event, DeviceEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert event.device_id.device_uuid.uuid == DEVICE_DEV1_UUID
Javi Moreno
committed
events_collector.stop()
def test_listen_events(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name
):
Javi Moreno
committed
LOGGER.warning('test_listen_events begin')
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
context_database.clear_all()
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
Javi Moreno
committed
events_collector.start()
# ----- Dump state of database before create the object ------------------------------------------------------------
Javi Moreno
committed
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
response = device_client.AddDevice(Device(**device_with_connect_rules))
assert response.device_uuid.uuid == DEVICE_DEV1_UUID
Javi Moreno
committed
kpi_id_list = events_collector.listen_events()
events_collector.stop()