Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy, os, pytest
import threading
import time
Francisco-Javier Moreno-Muro
committed
from queue import Queue
from random import random
Javi Moreno
committed
from typing import Tuple
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.base import STATE_STOPPED
from grpc._channel import _MultiThreadedRendezvous
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
from common.logger import getJSONLogger
Javi Moreno
committed
from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
from common.proto import monitoring_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, KpiList, SubsDescriptor, SubsList, AlarmID, \
AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse, AlarmResponse, RawKpiTable
Francisco-Javier Moreno-Muro
committed
from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_string_to_float
Javi Moreno
committed
from context.client.ContextClient import ContextClient
from context.service.grpc_server.ContextService import ContextService
from common.proto.context_pb2 import EventTypeEnum, DeviceEvent, Device, Empty
from device.client.DeviceClient import DeviceClient
from device.service.DeviceService import DeviceService
from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
Francisco-Javier Moreno-Muro
committed
from monitoring.service.AlarmManager import AlarmManager
from monitoring.service.MetricsDBTools import MetricsDB
from monitoring.service.SubscriptionManager import SubscriptionManager
os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE'
from device.service.drivers import DRIVERS # pylint: disable=wrong-import-position
from monitoring.client.MonitoringClient import MonitoringClient
from monitoring.service import ManagementDBTools, MetricsDBTools
from monitoring.service.MonitoringService import MonitoringService
from monitoring.service.EventTools import EventsDeviceCollector
from monitoring.tests.Messages import create_kpi_request, include_kpi_request, monitor_kpi_request, \
create_kpi_request_b, create_kpi_request_c, kpi_query, subs_descriptor, alarm_descriptor, \
alarm_subscription
from monitoring.tests.Objects import DEVICE_DEV1, DEVICE_DEV1_CONNECT_RULES, DEVICE_DEV1_UUID
from monitoring.service.MonitoringServiceServicerImpl import LOGGER
###########################
# Tests Setup
###########################
Javi Moreno
committed
CONTEXT_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.CONTEXT) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(CONTEXT_SERVICE_PORT)
DEVICE_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.DEVICE) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(DEVICE_SERVICE_PORT)
Javi Moreno
committed
MONITORING_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.MONITORING) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(MONITORING_SERVICE_PORT)
Javi Moreno
committed
METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME")
METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT")
METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE")
@pytest.fixture(scope='session')
def context_db_mb() -> Tuple[Database, MessageBroker]:
_database = Database(get_database_backend(backend=DatabaseBackendEnum.INMEMORY))
_message_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY))
Javi Moreno
committed
yield _database, _message_broker
_message_broker.terminate()
@pytest.fixture(scope='session')
def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
database, message_broker = context_db_mb
Javi Moreno
committed
database.clear_all()
_service = ContextService(database, message_broker)
Javi Moreno
committed
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name
_client = ContextClient()
Javi Moreno
committed
yield _client
_client.close()
@pytest.fixture(scope='session')
def device_service(context_service : ContextService): # pylint: disable=redefined-outer-name
LOGGER.info('Initializing DeviceService...')
driver_factory = DriverFactory(DRIVERS)
driver_instance_cache = DriverInstanceCache(driver_factory)
_service = DeviceService(driver_instance_cache)
_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding DeviceService...')
yield _service
LOGGER.info('Terminating DeviceService...')
_service.stop()
@pytest.fixture(scope='session')
def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name
_client = DeviceClient()
yield _client
_client.close()
# This fixture will be requested by test cases and last during testing session
@pytest.fixture(scope='session')
def monitoring_service(
context_service : ContextService, # pylint: disable=redefined-outer-name
device_service : DeviceService # pylint: disable=redefined-outer-name
):
LOGGER.info('Initializing MonitoringService...')
_service.start()
# yield the server, when test finishes, execution will resume to stop it
yield _service
LOGGER.info('Terminating MonitoringService...')
_service.stop()
# This fixture will be requested by test cases and last during testing session.
# The client requires the server, so client fixture has the server as dependency.
@pytest.fixture(scope='session')
def monitoring_client(monitoring_service : MonitoringService): # pylint: disable=redefined-outer-name
LOGGER.info('Initializing MonitoringClient...')
_client = MonitoringClient()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding MonitoringClient...')
yield _client
Javi Moreno
committed
LOGGER.info('Closing MonitoringClient...')
_client.close()
Javi Moreno
committed
def management_db():
_management_db = ManagementDBTools.ManagementDB('monitoring.db')
return _management_db
def metrics_db():
_metrics_db = MetricsDBTools.MetricsDB(
METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE)
return _metrics_db
@pytest.fixture(scope='session')
def subs_scheduler():
_scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)})
_scheduler.start()
return _scheduler
Francisco-Javier Moreno-Muro
committed
metrics_db = MetricsDB("localhost", "9009", "9000", "monitoring")
for i in range(50):
kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '')
kpiId = kpi_id_int
deviceId = 'DEV'+ str(kpi_id_int)
endpointId = 'END' + str(kpi_id_int)
serviceId = 'SERV' + str(kpi_id_int)
sliceId = 'SLC' + str(kpi_id_int)
connectionId = 'CON' + str(kpi_id_int)
time_stamp = timestamp_utcnow_to_float()
kpi_value = 500*random()
Francisco-Javier Moreno-Muro
committed
metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId,
kpi_value)
sleep(0.1)
###########################
# Tests Implementation
###########################
# Test case that makes use of client fixture to test server's CreateKpi method
def test_set_kpi(monitoring_client): # pylint: disable=redefined-outer-name
# make call to server
LOGGER.warning('test_create_kpi requesting')
for i in range(3):
response = monitoring_client.SetKpi(create_kpi_request(str(i+1)))
LOGGER.debug(str(response))
assert isinstance(response, KpiId)
# Test case that makes use of client fixture to test server's DeleteKpi method
def test_delete_kpi(monitoring_client): # pylint: disable=redefined-outer-name
# make call to server
LOGGER.warning('delete_kpi requesting')
response = monitoring_client.SetKpi(create_kpi_request('4'))
response = monitoring_client.DeleteKpi(response)
LOGGER.debug(str(response))
assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's GetKpiDescriptor method
def test_get_kpidescritor(monitoring_client): # pylint: disable=redefined-outer-name
LOGGER.warning('test_getkpidescritor_kpi begin')
response = monitoring_client.SetKpi(create_kpi_request('1'))
response = monitoring_client.GetKpiDescriptor(response)
LOGGER.debug(str(response))
assert isinstance(response, KpiDescriptor)
# Test case that makes use of client fixture to test server's GetKpiDescriptor method
def test_get_kpi_descriptor_list(monitoring_client): # pylint: disable=redefined-outer-name
LOGGER.warning('test_getkpidescritor_kpi begin')
response = monitoring_client.GetKpiDescriptorList(Empty())
LOGGER.debug(str(response))
assert isinstance(response, KpiDescriptorList)
# Test case that makes use of client fixture to test server's IncludeKpi method
def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name
# make call to server
LOGGER.warning('test_include_kpi requesting')
kpi_id = monitoring_client.SetKpi(create_kpi_request('1'))
LOGGER.debug(str(kpi_id))
response = monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
LOGGER.debug(str(response))
assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's MonitorKpi method
def test_monitor_kpi(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name
):
LOGGER.info('test_monitor_kpi begin')
# ----- Clean the database -----------------------------------------------------------------------------------------
context_database.clear_all()
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
# ----- Update the object ------------------------------------------------------------------------------------------
LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
response = device_client.AddDevice(Device(**device_with_connect_rules))
assert response.device_uuid.uuid == DEVICE_DEV1_UUID
response = monitoring_client.SetKpi(create_kpi_request('1'))
_monitor_kpi_request = monitor_kpi_request(response.kpi_id.uuid, 120, 5) # pylint: disable=maybe-no-member
response = monitoring_client.MonitorKpi(_monitor_kpi_request)
assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's QueryKpiData method
def test_query_kpi_data(monitoring_client,subs_scheduler): # pylint: disable=redefined-outer-name
kpi_id_list = []
for i in range(2):
kpi_id = monitoring_client.SetKpi(create_kpi_request(str(i+1)))
subs_scheduler.add_job(ingestion_data, args=[kpi_id.kpi_id.uuid])
kpi_id_list.append(kpi_id)
LOGGER.warning('test_query_kpi_data')
response = monitoring_client.QueryKpiData(kpi_query(kpi_id_list))
LOGGER.debug(str(response))
assert isinstance(response, RawKpiTable)
if (subs_scheduler.state != STATE_STOPPED):
subs_scheduler.shutdown()
# Test case that makes use of client fixture to test server's SetKpiSubscription method
def test_set_kpi_subscription(monitoring_client,subs_scheduler): # pylint: disable=redefined-outer-name
LOGGER.warning('test_set_kpi_subscription')
kpi_id = monitoring_client.SetKpi(create_kpi_request('1'))
subs_scheduler.add_job(ingestion_data, args=[kpi_id.kpi_id.uuid])
response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id))
assert isinstance(response, _MultiThreadedRendezvous)
for item in response:
LOGGER.debug(item)
assert isinstance(item, SubsResponse)
if (subs_scheduler.state != STATE_STOPPED):
subs_scheduler.shutdown()
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# Test case that makes use of client fixture to test server's GetSubsDescriptor method
def test_get_subs_descriptor(monitoring_client):
LOGGER.warning('test_get_subs_descriptor')
kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id))
for item in response:
response = monitoring_client.GetSubsDescriptor(item.subs_id)
LOGGER.debug(response)
assert isinstance(response, SubsDescriptor)
# Test case that makes use of client fixture to test server's GetSubscriptions method
def test_get_subscriptions(monitoring_client):
LOGGER.warning('test_get_subscriptions')
response = monitoring_client.GetSubscriptions(Empty())
LOGGER.debug(response)
assert isinstance(response, SubsList)
# Test case that makes use of client fixture to test server's DeleteSubscription method
def test_delete_subscription(monitoring_client):
LOGGER.warning('test_delete_subscription')
kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
subs = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id))
for item in subs:
response = monitoring_client.DeleteSubscription(item.subs_id)
assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's SetKpiAlarm method
def test_set_kpi_alarm(monitoring_client):
LOGGER.warning('test_set_kpi_alarm')
Francisco-Javier Moreno-Muro
committed
kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
response = monitoring_client.SetKpiAlarm(alarm_descriptor(kpi_id))
LOGGER.debug(str(response))
assert isinstance(response, AlarmID)
# Test case that makes use of client fixture to test server's GetAlarms method
def test_get_alarms(monitoring_client):
LOGGER.warning('test_get_alarms')
response = monitoring_client.GetAlarms(Empty())
LOGGER.debug(response)
assert isinstance(response, AlarmList)
# Test case that makes use of client fixture to test server's GetAlarmDescriptor method
def test_get_alarm_descriptor(monitoring_client):
LOGGER.warning('test_get_alarm_descriptor')
Francisco-Javier Moreno-Muro
committed
_kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
_alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id))
_response = monitoring_client.GetAlarmDescriptor(_alarm_id)
LOGGER.debug(_response)
assert isinstance(_response, AlarmDescriptor)
# Test case that makes use of client fixture to test server's GetAlarmResponseStream method
def test_get_alarm_response_stream(monitoring_client,subs_scheduler):
LOGGER.warning('test_get_alarm_descriptor')
_kpi_id = monitoring_client.SetKpi(create_kpi_request('3'))
Francisco-Javier Moreno-Muro
committed
_alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id))
subs_scheduler.add_job(ingestion_data,args=[_kpi_id.kpi_id.uuid])
Francisco-Javier Moreno-Muro
committed
_response = monitoring_client.GetAlarmResponseStream(alarm_subscription(_alarm_id))
assert isinstance(_response, _MultiThreadedRendezvous)
for item in _response:
LOGGER.debug(item)
assert isinstance(item,AlarmResponse)
if(subs_scheduler.state != STATE_STOPPED):
subs_scheduler.shutdown()
# Test case that makes use of client fixture to test server's DeleteAlarm method
def test_delete_alarm(monitoring_client):
LOGGER.warning('test_delete_alarm')
Francisco-Javier Moreno-Muro
committed
_kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
_alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id))
_response = monitoring_client.DeleteAlarm(_alarm_id)
LOGGER.debug(type(_response))
assert isinstance(_response, Empty)
# Test case that makes use of client fixture to test server's GetStreamKpi method
def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-name
response = monitoring_client.GetStreamKpi(monitoring_pb2.Kpi())
assert isinstance(response, _MultiThreadedRendezvous)
# Test case that makes use of client fixture to test server's GetInstantKpi method
# def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name
# LOGGER.warning('test_getinstant_kpi begin')
# kpi_id = monitoring_client.SetKpi(KpiId())
# monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
# sleep(0.3)
# response = monitoring_client.GetInstantKpi(kpi_id)
# LOGGER.debug(response)
# assert isinstance(response, Kpi)
def test_managementdb_tools_kpis(management_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_managementdb_tools_kpis begin')
_create_kpi_request = create_kpi_request('5')
kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member
kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member
kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member
kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member
kpi_slice_id = _create_kpi_request.slice_id.slice_uuid.uuid
kpi_connection_id = _create_kpi_request.connection_id.connection_uuid.uuid
_kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_slice_id,kpi_connection_id)
assert isinstance(_kpi_id, int)
response = management_db.get_KPI(_kpi_id)
response = management_db.set_monitoring_flag(_kpi_id,True)
assert response is True
response = management_db.check_monitoring_flag(_kpi_id)
assert response is True
management_db.set_monitoring_flag(_kpi_id, False)
response = management_db.check_monitoring_flag(_kpi_id)
assert response is False
response = management_db.get_KPIS()
response = management_db.delete_KPI(_kpi_id)
Francisco-Javier Moreno-Muro
committed
def test_managementdb_tools_insert_alarm(management_db):
LOGGER.warning('test_managementdb_tools_insert_alarm begin')
_alarm_description = "Alarm Description"
_alarm_name = "Alarm Name"
_kpi_id = "3"
_kpi_min_value = 0.0
_kpi_max_value = 250.0
_in_range = True
_include_min_value = False
_include_max_value = True
_alarm_id = management_db.insert_alarm(_alarm_description, _alarm_name, _kpi_id, _kpi_min_value,
_kpi_max_value,
_in_range, _include_min_value, _include_max_value)
LOGGER.debug(_alarm_id)
assert isinstance(_alarm_id,int)
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
#
# def test_metrics_db_tools(metrics_db): # pylint: disable=redefined-outer-name
# LOGGER.warning('test_metric_sdb_tools_write_kpi begin')
# _kpiId = "6"
#
# for i in range(50):
# _kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '')
# _deviceId = 'DEV4'
# _endpointId = 'END4'
# _serviceId = 'SERV4'
# _sliceId = 'SLC4'
# _connectionId = 'CON4'
# _time_stamp = timestamp_utcnow_to_float()
# _kpi_value = 500*random()
#
# metrics_db.write_KPI(_time_stamp, _kpiId, _kpiSampleType, _deviceId, _endpointId, _serviceId, _sliceId, _connectionId,
# _kpi_value)
# sleep(0.05)
#
# _query = f"SELECT * FROM monitoring WHERE kpi_id ='{_kpiId}'"
# _data = metrics_db.run_query(_query)
# assert len(_data) >= 50
#
# def test_subscription_manager_create_subscription(management_db,metrics_db,subs_scheduler):
# LOGGER.warning('test_subscription_manager_create_subscription begin')
# subs_queue = Queue()
#
# subs_manager = SubscriptionManager(metrics_db)
#
# subs_scheduler.add_job(ingestion_data)
#
# kpi_id = "3"
# sampling_duration_s = 20
# sampling_interval_s = 3
# real_start_time = timestamp_utcnow_to_float()
# start_timestamp = real_start_time
# end_timestamp = start_timestamp + sampling_duration_s
#
# subs_id = management_db.insert_subscription(kpi_id, "localhost", sampling_duration_s,
# sampling_interval_s,start_timestamp,end_timestamp)
# subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s,
# sampling_duration_s,start_timestamp,end_timestamp)
#
# # This is here to simulate application activity (which keeps the main thread alive).
# total_points = 0
# while True:
# while not subs_queue.empty():
# list = subs_queue.get_nowait()
# kpi_list = KpiList()
# for item in list:
# kpi = Kpi()
# kpi.kpi_id.kpi_id.uuid = item[0]
# kpi.timestamp.timestamp = timestamp_string_to_float(item[1])
# kpi.kpi_value.floatVal = item[2]
# kpi_list.kpi.append(kpi)
# total_points += 1
# LOGGER.debug(kpi_list)
# if timestamp_utcnow_to_float() > end_timestamp:
# break
#
# assert total_points != 0
Francisco-Javier Moreno-Muro
committed
def test_events_tools(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name
):
LOGGER.warning('test_get_device_events begin')
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
context_database.clear_all()
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
# ----- Update the object ------------------------------------------------------------------------------------------
LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
response = device_client.AddDevice(Device(**device_with_connect_rules))
assert response.device_uuid.uuid == DEVICE_DEV1_UUID
def test_get_device_events(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name
):
Javi Moreno
committed
Javi Moreno
committed
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
context_database.clear_all()
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
Javi Moreno
committed
events_collector.start()
# ----- Dump state of database before create the object ------------------------------------------------------------
Javi Moreno
committed
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
# ----- Check create event -----------------------------------------------------------------------------------------
LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
response = device_client.AddDevice(Device(**device_with_connect_rules))
assert response.device_uuid.uuid == DEVICE_DEV1_UUID
Javi Moreno
committed
event = events_collector.get_event(block=True)
Javi Moreno
committed
assert isinstance(event, DeviceEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert event.device_id.device_uuid.uuid == DEVICE_DEV1_UUID
Javi Moreno
committed
events_collector.stop()
def test_listen_events(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name
):
Javi Moreno
committed
LOGGER.warning('test_listen_events begin')
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
context_database.clear_all()
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
Javi Moreno
committed
events_collector.start()
# ----- Dump state of database before create the object ------------------------------------------------------------
Javi Moreno
committed
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
response = device_client.AddDevice(Device(**device_with_connect_rules))
assert response.device_uuid.uuid == DEVICE_DEV1_UUID
Javi Moreno
committed
kpi_id_list = events_collector.listen_events()
events_collector.stop()