Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy, os, pytest
import threading
import time
Javi Moreno
committed
from typing import Tuple
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
from grpc._channel import _MultiThreadedRendezvous
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
from common.logger import getJSONLogger
Javi Moreno
committed
from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
from common.proto import monitoring_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, KpiList, SubsDescriptor, SubsList, AlarmID, \
AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse, AlarmResponse
Javi Moreno
committed
from context.client.ContextClient import ContextClient
from context.service.grpc_server.ContextService import ContextService
from common.proto.context_pb2 import EventTypeEnum, DeviceEvent, Device, Empty
from device.client.DeviceClient import DeviceClient
from device.service.DeviceService import DeviceService
from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE'
from device.service.drivers import DRIVERS # pylint: disable=wrong-import-position
from monitoring.client.MonitoringClient import MonitoringClient
from monitoring.service import ManagementDBTools, MetricsDBTools
from monitoring.service.MonitoringService import MonitoringService
from monitoring.service.EventTools import EventsDeviceCollector
from monitoring.tests.Messages import create_kpi_request, include_kpi_request, monitor_kpi_request, \
create_kpi_request_b, create_kpi_request_c, kpi_query, subs_descriptor, alarm_descriptor, \
alarm_subscription
from monitoring.tests.Objects import DEVICE_DEV1, DEVICE_DEV1_CONNECT_RULES, DEVICE_DEV1_UUID
from monitoring.service.MonitoringServiceServicerImpl import LOGGER
###########################
# Tests Setup
###########################
Javi Moreno
committed
CONTEXT_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.CONTEXT) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(CONTEXT_SERVICE_PORT)
DEVICE_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.DEVICE) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(DEVICE_SERVICE_PORT)
Javi Moreno
committed
MONITORING_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.MONITORING) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(MONITORING_SERVICE_PORT)
Javi Moreno
committed
METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME")
METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT")
METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE")
@pytest.fixture(scope='session')
def context_db_mb() -> Tuple[Database, MessageBroker]:
_database = Database(get_database_backend(backend=DatabaseBackendEnum.INMEMORY))
_message_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY))
Javi Moreno
committed
yield _database, _message_broker
_message_broker.terminate()
@pytest.fixture(scope='session')
def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
database, message_broker = context_db_mb
Javi Moreno
committed
database.clear_all()
_service = ContextService(database, message_broker)
Javi Moreno
committed
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name
_client = ContextClient()
Javi Moreno
committed
yield _client
_client.close()
@pytest.fixture(scope='session')
def device_service(context_service : ContextService): # pylint: disable=redefined-outer-name
LOGGER.info('Initializing DeviceService...')
driver_factory = DriverFactory(DRIVERS)
driver_instance_cache = DriverInstanceCache(driver_factory)
_service = DeviceService(driver_instance_cache)
_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding DeviceService...')
yield _service
LOGGER.info('Terminating DeviceService...')
_service.stop()
@pytest.fixture(scope='session')
def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name
_client = DeviceClient()
yield _client
_client.close()
# This fixture will be requested by test cases and last during testing session
@pytest.fixture(scope='session')
def monitoring_service(
context_service : ContextService, # pylint: disable=redefined-outer-name
device_service : DeviceService # pylint: disable=redefined-outer-name
):
LOGGER.info('Initializing MonitoringService...')
_service.start()
# yield the server, when test finishes, execution will resume to stop it
yield _service
LOGGER.info('Terminating MonitoringService...')
_service.stop()
# This fixture will be requested by test cases and last during testing session.
# The client requires the server, so client fixture has the server as dependency.
@pytest.fixture(scope='session')
def monitoring_client(monitoring_service : MonitoringService): # pylint: disable=redefined-outer-name
LOGGER.info('Initializing MonitoringClient...')
_client = MonitoringClient()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding MonitoringClient...')
yield _client
Javi Moreno
committed
LOGGER.info('Closing MonitoringClient...')
_client.close()
Javi Moreno
committed
def management_db():
_management_db = ManagementDBTools.ManagementDB('monitoring.db')
return _management_db
def metrics_db():
_metrics_db = MetricsDBTools.MetricsDB(
METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE)
return _metrics_db
@pytest.fixture(scope='session')
def subs_scheduler():
_scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)})
_scheduler.start()
return _scheduler
def ingestion_data(monitoring_client):
_kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
_include_kpi_request = include_kpi_request(_kpi_id)
for i in range(200):
_include_kpi_request = include_kpi_request(_kpi_id)
monitoring_client.IncludeKpi(_include_kpi_request)
time.sleep(0.01)
###########################
# Tests Implementation
###########################
# Test case that makes use of client fixture to test server's CreateKpi method
def test_set_kpi(monitoring_client): # pylint: disable=redefined-outer-name
# make call to server
LOGGER.warning('test_create_kpi requesting')
response = monitoring_client.SetKpi(create_kpi_request())
LOGGER.debug(str(response))
response = monitoring_client.SetKpi(create_kpi_request_b())
LOGGER.debug(str(response))
assert isinstance(response, KpiId)
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# Test case that makes use of client fixture to test server's DeleteKpi method
def test_delete_kpi(monitoring_client): # pylint: disable=redefined-outer-name
# make call to server
LOGGER.warning('delete_kpi requesting')
response = monitoring_client.SetKpi(create_kpi_request_b())
response = monitoring_client.DeleteKpi(response)
LOGGER.debug(str(response))
assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's GetKpiDescriptor method
def test_get_kpidescritor(monitoring_client): # pylint: disable=redefined-outer-name
LOGGER.warning('test_getkpidescritor_kpi begin')
response = monitoring_client.SetKpi(create_kpi_request_c())
response = monitoring_client.GetKpiDescriptor(response)
LOGGER.debug(str(response))
assert isinstance(response, KpiDescriptor)
# Test case that makes use of client fixture to test server's GetKpiDescriptor method
def test_get_kpi_descriptor_list(monitoring_client): # pylint: disable=redefined-outer-name
LOGGER.warning('test_getkpidescritor_kpi begin')
response = monitoring_client.GetKpiDescriptorList(Empty())
LOGGER.debug(str(response))
assert isinstance(response, KpiDescriptorList)
# Test case that makes use of client fixture to test server's IncludeKpi method
def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name
# make call to server
LOGGER.warning('test_include_kpi requesting')
kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
response = monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
LOGGER.debug(str(response))
assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's MonitorKpi method
def test_monitor_kpi(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name
):
LOGGER.info('test_monitor_kpi begin')
# ----- Clean the database -----------------------------------------------------------------------------------------
context_database.clear_all()
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
# ----- Update the object ------------------------------------------------------------------------------------------
LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
response = device_client.AddDevice(Device(**device_with_connect_rules))
assert response.device_uuid.uuid == DEVICE_DEV1_UUID
response = monitoring_client.SetKpi(create_kpi_request())
_monitor_kpi_request = monitor_kpi_request(response.kpi_id.uuid, 120, 5) # pylint: disable=maybe-no-member
response = monitoring_client.MonitorKpi(_monitor_kpi_request)
assert isinstance(response, Empty)
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
# Test case that makes use of client fixture to test server's QueryKpiData method
def test_query_kpi_data(monitoring_client): # pylint: disable=redefined-outer-name
LOGGER.warning('test_query_kpi_data')
response = monitoring_client.QueryKpiData(kpi_query())
LOGGER.debug(str(response))
assert isinstance(response, KpiList)
def test_ingestion_data(monitoring_client):
_kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
_include_kpi_request = include_kpi_request(_kpi_id)
for i in range(100):
_include_kpi_request = include_kpi_request(_kpi_id)
monitoring_client.IncludeKpi(_include_kpi_request)
time.sleep(0.01)
# def test_subscription_scheduler(monitoring_client,metrics_db,subs_scheduler):
# subs_scheduler.add_job(ingestion_data(monitoring_client),id="1")
# Test case that makes use of client fixture to test server's SetKpiSubscription method
def test_set_kpi_subscription(monitoring_client,metrics_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_set_kpi_subscription')
kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
# thread = threading.Thread(target=test_ingestion_data, args=(monitoring_client,metrics_db))
# thread.start()
monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id))
assert isinstance(response, _MultiThreadedRendezvous)
LOGGER.debug(response)
for item in response:
LOGGER.debug(item)
assert isinstance(item, SubsResponse)
# Test case that makes use of client fixture to test server's GetSubsDescriptor method
def test_get_subs_descriptor(monitoring_client):
LOGGER.warning('test_get_subs_descriptor')
kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id))
for item in response:
response = monitoring_client.GetSubsDescriptor(item.subs_id)
LOGGER.debug(response)
assert isinstance(response, SubsDescriptor)
# Test case that makes use of client fixture to test server's GetSubscriptions method
def test_get_subscriptions(monitoring_client):
LOGGER.warning('test_get_subscriptions')
response = monitoring_client.GetSubscriptions(Empty())
LOGGER.debug(response)
assert isinstance(response, SubsList)
# Test case that makes use of client fixture to test server's DeleteSubscription method
def test_delete_subscription(monitoring_client):
LOGGER.warning('test_delete_subscription')
kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
subs = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id))
for item in subs:
response = monitoring_client.DeleteSubscription(item.subs_id)
assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's SetKpiAlarm method
def test_set_kpi_alarm(monitoring_client):
LOGGER.warning('test_set_kpi_alarm')
response = monitoring_client.SetKpiAlarm(alarm_descriptor())
LOGGER.debug(str(response))
assert isinstance(response, AlarmID)
# Test case that makes use of client fixture to test server's GetAlarms method
def test_get_alarms(monitoring_client):
LOGGER.warning('test_get_alarms')
response = monitoring_client.GetAlarms(Empty())
LOGGER.debug(response)
assert isinstance(response, AlarmList)
# Test case that makes use of client fixture to test server's GetAlarmDescriptor method
def test_get_alarm_descriptor(monitoring_client):
LOGGER.warning('test_get_alarm_descriptor')
alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor())
response = monitoring_client.GetAlarmDescriptor(alarm_id)
LOGGER.debug(response)
assert isinstance(response, AlarmDescriptor)
# Test case that makes use of client fixture to test server's GetAlarmResponseStream method
def test_get_alarm_response_stream(monitoring_client):
LOGGER.warning('test_get_alarm_descriptor')
alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor())
response = monitoring_client.GetAlarmResponseStream(alarm_subscription(alarm_id))
assert isinstance(response, _MultiThreadedRendezvous)
for item in response:
LOGGER.debug(response)
assert isinstance(item,AlarmResponse)
# Test case that makes use of client fixture to test server's DeleteAlarm method
def test_delete_alarm(monitoring_client):
LOGGER.warning('test_delete_alarm')
alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor())
response = monitoring_client.DeleteAlarm(alarm_id)
LOGGER.debug(type(response))
assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's GetStreamKpi method
def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-name
response = monitoring_client.GetStreamKpi(monitoring_pb2.Kpi())
assert isinstance(response, _MultiThreadedRendezvous)
# Test case that makes use of client fixture to test server's GetInstantKpi method
def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name
LOGGER.warning('test_getinstant_kpi begin')
kpi_id = monitoring_client.SetKpi(KpiId())
monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
sleep(0.3)
response = monitoring_client.GetInstantKpi(kpi_id)
LOGGER.debug(response)
assert isinstance(response, Kpi)
response = monitoring_client.GetInstantKpi(KpiId())
LOGGER.debug(type(response))
assert response.kpi_id.kpi_id.uuid == "NoID"
def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_managementdb_tools_insert_kpi begin')
_create_kpi_request = create_kpi_request()
kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member
kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member
kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member
kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member
response = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
def test_managementdb_tools_get_kpi(management_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_managementdb_tools_get_kpi begin')
_create_kpi_request = create_kpi_request()
kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member
kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member
kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member
kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member
_kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
response = management_db.get_KPI(_kpi_id)
def test_managementdb_tools_get_kpis(management_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_managementdb_tools_get_kpis begin')
response = management_db.get_KPIS()
def test_managementdb_tools_delete_kpi(management_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_managementdb_tools_get_kpi begin')
_create_kpi_request = create_kpi_request()
kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member
kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member
kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member
kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member
_kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id,
kpi_service_id)
response = management_db.delete_KPI(_kpi_id)
def test_metrics_db_tools_write_kpi(metrics_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_metric_sdb_tools_write_kpi begin')
def test_metrics_db_tools_read_kpi_points(metrics_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_metrics_db_tools_read_kpi_points begin')
def test_events_tools(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name
):
LOGGER.warning('test_get_device_events begin')
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
context_database.clear_all()
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
# ----- Update the object ------------------------------------------------------------------------------------------
LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
response = device_client.AddDevice(Device(**device_with_connect_rules))
assert response.device_uuid.uuid == DEVICE_DEV1_UUID
def test_get_device_events(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name
):
Javi Moreno
committed
Javi Moreno
committed
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
context_database.clear_all()
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
Javi Moreno
committed
events_collector.start()
# ----- Dump state of database before create the object ------------------------------------------------------------
Javi Moreno
committed
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
# ----- Check create event -----------------------------------------------------------------------------------------
LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
response = device_client.AddDevice(Device(**device_with_connect_rules))
assert response.device_uuid.uuid == DEVICE_DEV1_UUID
Javi Moreno
committed
event = events_collector.get_event(block=True)
Javi Moreno
committed
assert isinstance(event, DeviceEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert event.device_id.device_uuid.uuid == DEVICE_DEV1_UUID
Javi Moreno
committed
events_collector.stop()
def test_listen_events(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name
):
Javi Moreno
committed
LOGGER.warning('test_listen_events begin')
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
context_database.clear_all()
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
Javi Moreno
committed
events_collector.start()
# ----- Dump state of database before create the object ------------------------------------------------------------
Javi Moreno
committed
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
response = device_client.AddDevice(Device(**device_with_connect_rules))
assert response.device_uuid.uuid == DEVICE_DEV1_UUID
Javi Moreno
committed
kpi_id_list = events_collector.listen_events()
events_collector.stop()