Skip to content
test_unitary.py 30.1 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy, os, pytest
from time import sleep

from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
from grpc._channel import _MultiThreadedRendezvous

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.Constants import ServiceNameEnum
from common.Settings import (
    ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
from common.logger import getJSONLogger
from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
from common.proto import monitoring_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, KpiList, SubsDescriptor, SubsList, AlarmID, \
    AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse, AlarmResponse
from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_string_to_float

from context.client.ContextClient import ContextClient
from context.service.grpc_server.ContextService import ContextService
from common.proto.context_pb2 import EventTypeEnum, DeviceEvent, Device, Empty
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

from device.client.DeviceClient import DeviceClient
from device.service.DeviceService import DeviceService
from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
from monitoring.service.AlarmManager import AlarmManager
from monitoring.service.MetricsDBTools import MetricsDB
from monitoring.service.SubscriptionManager import SubscriptionManager
os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE'
from device.service.drivers import DRIVERS  # pylint: disable=wrong-import-position

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from monitoring.client.MonitoringClient import MonitoringClient
from monitoring.service import ManagementDBTools, MetricsDBTools
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from monitoring.service.MonitoringService import MonitoringService
from monitoring.service.EventTools import EventsDeviceCollector
from monitoring.tests.Messages import create_kpi_request, include_kpi_request, monitor_kpi_request, \
    create_kpi_request_b, create_kpi_request_c, kpi_query, subs_descriptor, alarm_descriptor, \
    alarm_subscription
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from monitoring.tests.Objects import DEVICE_DEV1, DEVICE_DEV1_CONNECT_RULES, DEVICE_DEV1_UUID

from monitoring.service.MonitoringServiceServicerImpl import LOGGER
Javi Moreno's avatar
Javi Moreno committed
###########################
# Tests Setup
###########################
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
LOCAL_HOST = '127.0.0.1'
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
CONTEXT_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.CONTEXT) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(CONTEXT_SERVICE_PORT)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
DEVICE_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.DEVICE) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(DEVICE_SERVICE_PORT)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
MONITORING_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.MONITORING) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(MONITORING_SERVICE_PORT)
METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME")
METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT")
METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE")

Javi Moreno's avatar
Javi Moreno committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
@pytest.fixture(scope='session')
def context_db_mb() -> Tuple[Database, MessageBroker]:
    _database = Database(get_database_backend(backend=DatabaseBackendEnum.INMEMORY))
    _message_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY))
    yield _database, _message_broker
    _message_broker.terminate()

@pytest.fixture(scope='session')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
    database, message_broker = context_db_mb
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _service = ContextService(database, message_broker)
    _service.start()
    yield _service
    _service.stop()

@pytest.fixture(scope='session')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name
    _client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def device_service(context_service : ContextService): # pylint: disable=redefined-outer-name
    LOGGER.info('Initializing DeviceService...')
    driver_factory = DriverFactory(DRIVERS)
    driver_instance_cache = DriverInstanceCache(driver_factory)
    _service = DeviceService(driver_instance_cache)
    _service.start()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # yield the server, when test finishes, execution will resume to stop it
    LOGGER.info('Yielding DeviceService...')
    yield _service

    LOGGER.info('Terminating DeviceService...')
    _service.stop()

@pytest.fixture(scope='session')
def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name
    _client = DeviceClient()
    yield _client
    _client.close()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# This fixture will be requested by test cases and last during testing session
@pytest.fixture(scope='session')
def monitoring_service(
        context_service : ContextService,  # pylint: disable=redefined-outer-name
        device_service : DeviceService     # pylint: disable=redefined-outer-name
    ):
    LOGGER.info('Initializing MonitoringService...')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _service = MonitoringService()

    # yield the server, when test finishes, execution will resume to stop it
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    LOGGER.info('Yielding MonitoringService...')

    LOGGER.info('Terminating MonitoringService...')

# This fixture will be requested by test cases and last during testing session.
# The client requires the server, so client fixture has the server as dependency.
@pytest.fixture(scope='session')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def monitoring_client(monitoring_service : MonitoringService): # pylint: disable=redefined-outer-name
    LOGGER.info('Initializing MonitoringClient...')
    _client = MonitoringClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # yield the server, when test finishes, execution will resume to stop it
    LOGGER.info('Yielding MonitoringClient...')
    yield _client
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    LOGGER.info('Closing MonitoringClient...')
    _client.close()
Javi Moreno's avatar
Javi Moreno committed
@pytest.fixture(scope='session')
def management_db():
    _management_db = ManagementDBTools.ManagementDB('monitoring.db')
    return _management_db
Javi Moreno's avatar
Javi Moreno committed

@pytest.fixture(scope='session')
def metrics_db():
    _metrics_db = MetricsDBTools.MetricsDB(
        METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE)
    return _metrics_db

@pytest.fixture(scope='session')
def subs_scheduler():
    _scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)})
    _scheduler.start()

    return _scheduler

def ingestion_data():
    metrics_db = MetricsDB("localhost", "9009", "9000", "monitoring")
        kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '')
        kpiId = "3"
        deviceId = 'DEV3'
        endpointId = 'END3'
        serviceId = 'SERV3'
        sliceId = 'SLC3'
        connectionId = 'CON3'
        time_stamp = timestamp_utcnow_to_float()
        kpi_value = 500*random()

        metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId,
                                  kpi_value)
Javi Moreno's avatar
Javi Moreno committed
###########################
# Tests Implementation
###########################
# Test case that makes use of client fixture to test server's CreateKpi method
def test_set_kpi(monitoring_client): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_create_kpi requesting')
    response = monitoring_client.SetKpi(create_kpi_request())
    response = monitoring_client.SetKpi(create_kpi_request_b())
    LOGGER.debug(str(response))
    assert isinstance(response, KpiId)

# Test case that makes use of client fixture to test server's DeleteKpi method
def test_delete_kpi(monitoring_client): # pylint: disable=redefined-outer-name
    # make call to server
    LOGGER.warning('delete_kpi requesting')
    response = monitoring_client.SetKpi(create_kpi_request_b())
    response = monitoring_client.DeleteKpi(response)
    LOGGER.debug(str(response))
    assert isinstance(response, Empty)

# Test case that makes use of client fixture to test server's GetKpiDescriptor method
def test_get_kpidescritor(monitoring_client): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_getkpidescritor_kpi begin')
    response = monitoring_client.SetKpi(create_kpi_request_c())
    response = monitoring_client.GetKpiDescriptor(response)
    LOGGER.debug(str(response))
    assert isinstance(response, KpiDescriptor)

# Test case that makes use of client fixture to test server's GetKpiDescriptor method
def test_get_kpi_descriptor_list(monitoring_client): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_getkpidescritor_kpi begin')
    response = monitoring_client.GetKpiDescriptorList(Empty())
    LOGGER.debug(str(response))
    assert isinstance(response, KpiDescriptorList)

# Test case that makes use of client fixture to test server's IncludeKpi method
def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name
    # make call to server
    LOGGER.warning('test_include_kpi requesting')
    kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
    response = monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
    LOGGER.debug(str(response))
    assert isinstance(response, Empty)

# Test case that makes use of client fixture to test server's MonitorKpi method
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def test_monitor_kpi(
        context_client : ContextClient,                 # pylint: disable=redefined-outer-name
        device_client : DeviceClient,                   # pylint: disable=redefined-outer-name
        monitoring_client : MonitoringClient,           # pylint: disable=redefined-outer-name
        context_db_mb : Tuple[Database, MessageBroker]  # pylint: disable=redefined-outer-name
    ):
    LOGGER.info('test_monitor_kpi begin')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_database = context_db_mb[0]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Clean the database -----------------------------------------------------------------------------------------
    context_database.clear_all()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Dump state of database before create the object ------------------------------------------------------------
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

    # ----- Update the object ------------------------------------------------------------------------------------------
    LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
    device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
    device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
    response = device_client.AddDevice(Device(**device_with_connect_rules))
    assert response.device_uuid.uuid == DEVICE_DEV1_UUID

    response = monitoring_client.SetKpi(create_kpi_request())
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _monitor_kpi_request = monitor_kpi_request(response.kpi_id.uuid, 120, 5) # pylint: disable=maybe-no-member
    response = monitoring_client.MonitorKpi(_monitor_kpi_request)
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.debug(str(response))
    assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's QueryKpiData method
def test_query_kpi_data(monitoring_client): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_query_kpi_data')
    response = monitoring_client.QueryKpiData(kpi_query())
    LOGGER.debug(str(response))
    assert isinstance(response, KpiList)

# Test case that makes use of client fixture to test server's SetKpiSubscription method
def test_set_kpi_subscription(monitoring_client,metrics_db,subs_scheduler): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_set_kpi_subscription')
    kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
    response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id))
    assert isinstance(response, _MultiThreadedRendezvous)
    for item in response:
        LOGGER.debug(item)
        assert isinstance(item, SubsResponse)

# Test case that makes use of client fixture to test server's GetSubsDescriptor method
def test_get_subs_descriptor(monitoring_client):
    LOGGER.warning('test_get_subs_descriptor')
    kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
    monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
    response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id))
    for item in response:
        response = monitoring_client.GetSubsDescriptor(item.subs_id)
        LOGGER.debug(response)
        assert isinstance(response, SubsDescriptor)

# Test case that makes use of client fixture to test server's GetSubscriptions method
def test_get_subscriptions(monitoring_client):
    LOGGER.warning('test_get_subscriptions')
    response = monitoring_client.GetSubscriptions(Empty())
    LOGGER.debug(response)
    assert isinstance(response, SubsList)

# Test case that makes use of client fixture to test server's DeleteSubscription method
def test_delete_subscription(monitoring_client):
    LOGGER.warning('test_delete_subscription')
    kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
    monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
    subs = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id))
    for item in subs:
        response = monitoring_client.DeleteSubscription(item.subs_id)
        assert isinstance(response, Empty)

# Test case that makes use of client fixture to test server's SetKpiAlarm method
def test_set_kpi_alarm(monitoring_client):
    LOGGER.warning('test_set_kpi_alarm')
    kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
    response = monitoring_client.SetKpiAlarm(alarm_descriptor(kpi_id))
    LOGGER.debug(str(response))
    assert isinstance(response, AlarmID)

# Test case that makes use of client fixture to test server's GetAlarms method
def test_get_alarms(monitoring_client):
    LOGGER.warning('test_get_alarms')
    response = monitoring_client.GetAlarms(Empty())
    LOGGER.debug(response)
    assert isinstance(response, AlarmList)

# Test case that makes use of client fixture to test server's GetAlarmDescriptor method
def test_get_alarm_descriptor(monitoring_client):
    LOGGER.warning('test_get_alarm_descriptor')
    _kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
    _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id))
    _response = monitoring_client.GetAlarmDescriptor(_alarm_id)
    LOGGER.debug(_response)
    assert isinstance(_response, AlarmDescriptor)

# Test case that makes use of client fixture to test server's GetAlarmResponseStream method
def test_get_alarm_response_stream(monitoring_client,subs_scheduler):
    LOGGER.warning('test_get_alarm_descriptor')
    _kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
    _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id))
    subs_scheduler.add_job(ingestion_data)
    _response = monitoring_client.GetAlarmResponseStream(alarm_subscription(_alarm_id))
    assert isinstance(_response, _MultiThreadedRendezvous)
    for item in _response:
        assert isinstance(item,AlarmResponse)

# Test case that makes use of client fixture to test server's DeleteAlarm method
def test_delete_alarm(monitoring_client):
    LOGGER.warning('test_delete_alarm')
    _kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
    _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id))
    _response = monitoring_client.DeleteAlarm(_alarm_id)
    LOGGER.debug(type(_response))
    assert isinstance(_response, Empty)
# Test case that makes use of client fixture to test server's GetStreamKpi method
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_getstream_kpi begin')
    response = monitoring_client.GetStreamKpi(monitoring_pb2.Kpi())
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.debug(str(response))
    assert isinstance(response, _MultiThreadedRendezvous)
Javi Moreno's avatar
 
Javi Moreno committed

# Test case that makes use of client fixture to test server's GetInstantKpi method
# def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name
#     LOGGER.warning('test_getinstant_kpi begin')
#     kpi_id = monitoring_client.SetKpi(KpiId())
#     monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
#     sleep(0.3)
#     response = monitoring_client.GetInstantKpi(kpi_id)
#     LOGGER.debug(response)
#     assert isinstance(response, Kpi)
def test_managementdb_tools_insert_kpi(management_db): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_managementdb_tools_insert_kpi begin')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _create_kpi_request = create_kpi_request()
    kpi_description = _create_kpi_request.kpi_description                # pylint: disable=maybe-no-member
    kpi_sample_type = _create_kpi_request.kpi_sample_type                # pylint: disable=maybe-no-member
    kpi_device_id   = _create_kpi_request.device_id.device_uuid.uuid     # pylint: disable=maybe-no-member
    kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
    kpi_service_id  = _create_kpi_request.service_id.service_uuid.uuid   # pylint: disable=maybe-no-member
    kpi_slice_id = _create_kpi_request.slice_id.slice_uuid.uuid
    kpi_connection_id = _create_kpi_request.connection_id.connection_uuid.uuid
Javi Moreno's avatar
Javi Moreno committed

    response = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_slice_id,kpi_connection_id)
Javi Moreno's avatar
Javi Moreno committed
    assert isinstance(response, int)

def test_managementdb_tools_get_kpi(management_db): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_managementdb_tools_get_kpi begin')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _create_kpi_request = create_kpi_request()
    kpi_description = _create_kpi_request.kpi_description                # pylint: disable=maybe-no-member
    kpi_sample_type = _create_kpi_request.kpi_sample_type                # pylint: disable=maybe-no-member
    kpi_device_id   = _create_kpi_request.device_id.device_uuid.uuid     # pylint: disable=maybe-no-member
    kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member
    kpi_service_id  = _create_kpi_request.service_id.service_uuid.uuid   # pylint: disable=maybe-no-member
    kpi_slice_id = _create_kpi_request.slice_id.slice_uuid.uuid
    kpi_connection_id  = _create_kpi_request.connection_id.connection_uuid.uuid
    _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id,kpi_slice_id,kpi_connection_id)
    response = management_db.get_KPI(_kpi_id)
Javi Moreno's avatar
Javi Moreno committed
    assert isinstance(response, tuple)

def test_managementdb_tools_get_kpis(management_db): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_managementdb_tools_get_kpis begin')
    response = management_db.get_KPIS()
Javi Moreno's avatar
Javi Moreno committed
    assert isinstance(response, list)

def test_managementdb_tools_delete_kpi(management_db): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_managementdb_tools_get_kpi begin')
Javi Moreno's avatar
Javi Moreno committed

    _create_kpi_request = create_kpi_request()
    kpi_description = _create_kpi_request.kpi_description  # pylint: disable=maybe-no-member
    kpi_sample_type = _create_kpi_request.kpi_sample_type  # pylint: disable=maybe-no-member
    kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid  # pylint: disable=maybe-no-member
    kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid  # pylint: disable=maybe-no-member
    kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid  # pylint: disable=maybe-no-member
    kpi_slice_id = _create_kpi_request.slice_id.slice_uuid.uuid
    kpi_connection_id = _create_kpi_request.connection_id.connection_uuid.uuid
Javi Moreno's avatar
Javi Moreno committed

    _kpi_id = management_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id,
                                        kpi_service_id, kpi_slice_id, kpi_connection_id)
Javi Moreno's avatar
Javi Moreno committed

    response = management_db.delete_KPI(_kpi_id)
Javi Moreno's avatar
Javi Moreno committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response
Javi Moreno's avatar
Javi Moreno committed

def test_managementdb_tools_insert_alarm(management_db):
    LOGGER.warning('test_managementdb_tools_insert_alarm begin')

    _alarm_description  = "Alarm Description"
    _alarm_name         = "Alarm Name"
    _kpi_id             = "3"
    _kpi_min_value      = 0.0
    _kpi_max_value      = 250.0
    _in_range           = True
    _include_min_value  = False
    _include_max_value  = True

    _alarm_id = management_db.insert_alarm(_alarm_description, _alarm_name, _kpi_id, _kpi_min_value,
                                               _kpi_max_value,
                                               _in_range, _include_min_value, _include_max_value)
    LOGGER.debug(_alarm_id)
    assert isinstance(_alarm_id,int)
#
# def test_metrics_db_tools(metrics_db): # pylint: disable=redefined-outer-name
#     LOGGER.warning('test_metric_sdb_tools_write_kpi begin')
#     _kpiId = "6"
#
#     for i in range(50):
#         _kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '')
#         _deviceId = 'DEV4'
#         _endpointId = 'END4'
#         _serviceId = 'SERV4'
#         _sliceId = 'SLC4'
#         _connectionId = 'CON4'
#         _time_stamp = timestamp_utcnow_to_float()
#         _kpi_value = 500*random()
#
#         metrics_db.write_KPI(_time_stamp, _kpiId, _kpiSampleType, _deviceId, _endpointId, _serviceId, _sliceId, _connectionId,
#                                   _kpi_value)
#         sleep(0.05)
#
#     _query = f"SELECT * FROM monitoring WHERE kpi_id ='{_kpiId}'"
#     _data = metrics_db.run_query(_query)
#     assert len(_data) >= 50
#
# def test_subscription_manager_create_subscription(management_db,metrics_db,subs_scheduler):
#     LOGGER.warning('test_subscription_manager_create_subscription begin')
#     subs_queue = Queue()
#
#     subs_manager = SubscriptionManager(metrics_db)
#
#     subs_scheduler.add_job(ingestion_data)
#
#     kpi_id = "3"
#     sampling_duration_s = 20
#     sampling_interval_s = 3
#     real_start_time     = timestamp_utcnow_to_float()
#     start_timestamp     = real_start_time
#     end_timestamp       = start_timestamp + sampling_duration_s
#
#     subs_id = management_db.insert_subscription(kpi_id, "localhost", sampling_duration_s,
#                                                sampling_interval_s,start_timestamp,end_timestamp)
#     subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s,
#                                      sampling_duration_s,start_timestamp,end_timestamp)
#
#     # This is here to simulate application activity (which keeps the main thread alive).
#     total_points = 0
#     while True:
#         while not subs_queue.empty():
#             list = subs_queue.get_nowait()
#             kpi_list = KpiList()
#             for item in list:
#                 kpi = Kpi()
#                 kpi.kpi_id.kpi_id.uuid = item[0]
#                 kpi.timestamp.timestamp = timestamp_string_to_float(item[1])
#                 kpi.kpi_value.floatVal = item[2]
#                 kpi_list.kpi.append(kpi)
#                 total_points += 1
#             LOGGER.debug(kpi_list)
#         if timestamp_utcnow_to_float() > end_timestamp:
#             break
#
#     assert total_points != 0
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def test_events_tools(
        context_client : ContextClient,                 # pylint: disable=redefined-outer-name
        device_client : DeviceClient,                   # pylint: disable=redefined-outer-name
        monitoring_client : MonitoringClient,           # pylint: disable=redefined-outer-name
        context_db_mb : Tuple[Database, MessageBroker]  # pylint: disable=redefined-outer-name
    ):
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.warning('test_get_device_events begin')

    context_database = context_db_mb[0]

    # ----- Clean the database -----------------------------------------------------------------------------------------
    context_database.clear_all()

    # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    events_collector = EventsDeviceCollector()
Javi Moreno's avatar
Javi Moreno committed
    events_collector.start()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Dump state of database before create the object ------------------------------------------------------------
Javi Moreno's avatar
Javi Moreno committed
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

    # ----- Update the object ------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
    device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
    device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
    response = device_client.AddDevice(Device(**device_with_connect_rules))
    assert response.device_uuid.uuid == DEVICE_DEV1_UUID
Javi Moreno's avatar
Javi Moreno committed

    events_collector.stop()


Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def test_get_device_events(
        context_client : ContextClient,                 # pylint: disable=redefined-outer-name
        device_client : DeviceClient,                   # pylint: disable=redefined-outer-name
        monitoring_client : MonitoringClient,           # pylint: disable=redefined-outer-name
        context_db_mb : Tuple[Database, MessageBroker]  # pylint: disable=redefined-outer-name
    ):
Javi Moreno's avatar
Javi Moreno committed
    LOGGER.warning('test_get_device_events begin')

    context_database = context_db_mb[0]

    # ----- Clean the database -----------------------------------------------------------------------------------------
    context_database.clear_all()

    # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    events_collector = EventsDeviceCollector()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Dump state of database before create the object ------------------------------------------------------------
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

    # ----- Check create event -----------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
    device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
    device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
    response = device_client.AddDevice(Device(**device_with_connect_rules))
    assert response.device_uuid.uuid == DEVICE_DEV1_UUID
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    event = events_collector.get_event(block=True)
    assert isinstance(event, DeviceEvent)
    assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert event.device_id.device_uuid.uuid == DEVICE_DEV1_UUID
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def test_listen_events(
        context_client : ContextClient,                 # pylint: disable=redefined-outer-name
        device_client : DeviceClient,                   # pylint: disable=redefined-outer-name
        monitoring_client : MonitoringClient,           # pylint: disable=redefined-outer-name
        context_db_mb : Tuple[Database, MessageBroker]  # pylint: disable=redefined-outer-name
    ):

    LOGGER.warning('test_listen_events begin')

    context_database = context_db_mb[0]

    # ----- Clean the database -----------------------------------------------------------------------------------------
    context_database.clear_all()

    # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    events_collector = EventsDeviceCollector()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Dump state of database before create the object ------------------------------------------------------------
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID))
    device_with_connect_rules = copy.deepcopy(DEVICE_DEV1)
    device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES)
    response = device_client.AddDevice(Device(**device_with_connect_rules))
    assert response.device_uuid.uuid == DEVICE_DEV1_UUID
Javi Moreno's avatar
Javi Moreno committed
    kpi_id_list = events_collector.listen_events()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert len(kpi_id_list) > 0
    events_collector.stop()