Commit 9a69ab50 authored by Francisco-Javier Moreno-Muro's avatar Francisco-Javier Moreno-Muro
Browse files

Improving unit tests and solving issues with Subscription and Alarm Managers (not finished)

parent e0bd3455
Loading
Loading
Loading
Loading
+16 −14
Original line number Diff line number Diff line
@@ -28,7 +28,7 @@ from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmLis
    KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \
    MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse
from common.rpc_method_wrapper.ServiceExceptions import ServiceException
from common.tools.timestamp.Converters import timestamp_string_to_float
from common.tools.timestamp.Converters import timestamp_string_to_float, timestamp_utcnow_to_float

from monitoring.service import ManagementDBTools, MetricsDBTools
from device.client.DeviceClient import DeviceClient
@@ -277,6 +277,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
                                                  start_timestamp, end_timestamp)

            # parse queue to append kpis into the list
            while True:
                while not subs_queue.empty():
                    list = subs_queue.get_nowait()
                    for item in list:
@@ -285,6 +286,8 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
                        kpi.timestamp.timestamp = timestamp_string_to_float(item[1])
                        kpi.kpi_value.floatVal = item[2]  # This must be improved
                        subs_response.kpi_list.kpi.append(kpi)
                if timestamp_utcnow_to_float() > end_timestamp:
                    break

            subs_response.subs_id.subs_id.uuid = str(subs_id)

@@ -433,6 +436,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
        LOGGER.info('GetAlarmDescriptor')
        try:
            alarm_id = request.alarm_id.uuid
            LOGGER.debug(alarm_id)
            alarm = self.management_db.get_alarm(alarm_id)
            response = AlarmDescriptor()

@@ -463,15 +467,13 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
        LOGGER.info('GetAlarmResponseStream')
        try:
            alarm_id = request.alarm_id.alarm_id.uuid
            alarm = self.management_db.get_alarm(alarm_id)
            alarm_data = self.management_db.get_alarm(alarm_id)
            alarm_response = AlarmResponse()

            if alarm:

            if alarm_data:
                LOGGER.debug(f"{alarm_data}")
                alarm_queue = Queue()

                alarm_data = self.management_db.get_alarm(alarm)

                alarm_id = request.alarm_id.alarm_id.uuid
                kpi_id = alarm_data[3]
                kpiMinValue = alarm_data[4]
+14 −8
Original line number Diff line number Diff line
@@ -83,12 +83,18 @@ def kpi_query():
def subs_descriptor(kpi_id):
    _subs_descriptor = monitoring_pb2.SubsDescriptor()

    sampling_duration_s = 20
    sampling_interval_s = 3
    real_start_time     = timestamp_utcnow_to_float()
    start_timestamp     = real_start_time
    end_timestamp       = start_timestamp + sampling_duration_s

    _subs_descriptor.subs_id.subs_id.uuid       = ""
    _subs_descriptor.kpi_id.kpi_id.uuid         = kpi_id.kpi_id.uuid
    _subs_descriptor.sampling_duration_s        = 10
    _subs_descriptor.sampling_interval_s        = 2
    _subs_descriptor.start_timestamp.timestamp  = timestamp_utcnow_to_float()
    _subs_descriptor.end_timestamp.timestamp    = timestamp_utcnow_to_float() + 10
    _subs_descriptor.sampling_duration_s        = sampling_duration_s
    _subs_descriptor.sampling_interval_s        = sampling_interval_s
    _subs_descriptor.start_timestamp.timestamp  = start_timestamp
    _subs_descriptor.end_timestamp.timestamp    = end_timestamp

    return _subs_descriptor

@@ -97,14 +103,14 @@ def subs_id():

    return _subs_id

def alarm_descriptor():
def alarm_descriptor(kpi_id):
    _alarm_descriptor = monitoring_pb2.AlarmDescriptor()

    _alarm_descriptor.alarm_description                     = "Alarm Description"
    _alarm_descriptor.name                                  = "Alarm Name"
    _alarm_descriptor.kpi_id.kpi_id.uuid                    = "1"
    _alarm_descriptor.kpi_id.kpi_id.uuid                    = kpi_id.kpi_id.uuid
    _alarm_descriptor.kpi_value_range.kpiMinValue.floatVal  = 0.0
    _alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal  = 50.0
    _alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal  = 250.0
    _alarm_descriptor.kpi_value_range.inRange               = True
    _alarm_descriptor.kpi_value_range.includeMinValue       = False
    _alarm_descriptor.kpi_value_range.includeMaxValue       = True
@@ -121,7 +127,7 @@ def alarm_descriptor_b():
def alarm_subscription(alarm_id):
    _alarm_descriptor = monitoring_pb2.AlarmSubscription()

    _alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm_id)
    _alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm_id.alarm_id.uuid)

    return _alarm_descriptor

+143 −37
Original line number Diff line number Diff line
@@ -15,6 +15,8 @@
import copy, os, pytest
import threading
import time
from queue import Queue
from random import random
from time import sleep
from typing import Tuple

@@ -34,6 +36,7 @@ from common.proto import monitoring_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, KpiList, SubsDescriptor, SubsList, AlarmID, \
    AlarmDescriptor, AlarmList, Kpi, KpiDescriptorList, SubsResponse, AlarmResponse
from common.tools.timestamp.Converters import timestamp_utcnow_to_float, timestamp_string_to_float

from context.client.ContextClient import ContextClient
from context.service.grpc_server.ContextService import ContextService
@@ -43,6 +46,9 @@ from device.client.DeviceClient import DeviceClient
from device.service.DeviceService import DeviceService
from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
from monitoring.service.AlarmManager import AlarmManager
from monitoring.service.MetricsDBTools import MetricsDB
from monitoring.service.SubscriptionManager import SubscriptionManager

os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE'
from device.service.drivers import DRIVERS  # pylint: disable=wrong-import-position
@@ -175,14 +181,23 @@ def subs_scheduler():

    return _scheduler

def ingestion_data(monitoring_client):
    _kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
    _include_kpi_request = include_kpi_request(_kpi_id)
def ingestion_data():
    metrics_db = MetricsDB("localhost", "9009", "9000", "monitoring")

    for i in range(200):
        _include_kpi_request = include_kpi_request(_kpi_id)
        monitoring_client.IncludeKpi(_include_kpi_request)
        time.sleep(0.01)
        kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '')
        kpiId = "3"
        deviceId = 'DEV3'
        endpointId = 'END3'
        serviceId = 'SERV3'
        sliceId = 'SLC3'
        connectionId = 'CON3'
        time_stamp = timestamp_utcnow_to_float()
        kpi_value = 500*random()

        metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId,
                                  kpi_value)
        sleep(0.05)

###########################
# Tests Implementation
@@ -274,28 +289,13 @@ def test_query_kpi_data(monitoring_client): # pylint: disable=redefined-outer-na
    LOGGER.debug(str(response))
    assert isinstance(response, KpiList)

def test_ingestion_data(monitoring_client):
    _kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
    _include_kpi_request = include_kpi_request(_kpi_id)

    for i in range(100):
        _include_kpi_request = include_kpi_request(_kpi_id)
        monitoring_client.IncludeKpi(_include_kpi_request)
        time.sleep(0.01)

# def test_subscription_scheduler(monitoring_client,metrics_db,subs_scheduler):
#     subs_scheduler.add_job(ingestion_data(monitoring_client),id="1")

# Test case that makes use of client fixture to test server's SetKpiSubscription method
def test_set_kpi_subscription(monitoring_client,metrics_db): # pylint: disable=redefined-outer-name
def test_set_kpi_subscription(monitoring_client,metrics_db,subs_scheduler): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_set_kpi_subscription')
    kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
    # thread = threading.Thread(target=test_ingestion_data, args=(monitoring_client,metrics_db))
    # thread.start()
    monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
    subs_scheduler.add_job(ingestion_data)
    response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id))
    assert isinstance(response, _MultiThreadedRendezvous)
    LOGGER.debug(response)
    for item in response:
        LOGGER.debug(item)
        assert isinstance(item, SubsResponse)
@@ -331,7 +331,8 @@ def test_delete_subscription(monitoring_client):
# Test case that makes use of client fixture to test server's SetKpiAlarm method
def test_set_kpi_alarm(monitoring_client):
    LOGGER.warning('test_set_kpi_alarm')
    response = monitoring_client.SetKpiAlarm(alarm_descriptor())
    kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
    response = monitoring_client.SetKpiAlarm(alarm_descriptor(kpi_id))
    LOGGER.debug(str(response))
    assert isinstance(response, AlarmID)

@@ -345,28 +346,31 @@ def test_get_alarms(monitoring_client):
# Test case that makes use of client fixture to test server's GetAlarmDescriptor method
def test_get_alarm_descriptor(monitoring_client):
    LOGGER.warning('test_get_alarm_descriptor')
    alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor())
    response = monitoring_client.GetAlarmDescriptor(alarm_id)
    LOGGER.debug(response)
    assert isinstance(response, AlarmDescriptor)
    _kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
    _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id))
    _response = monitoring_client.GetAlarmDescriptor(_alarm_id)
    LOGGER.debug(_response)
    assert isinstance(_response, AlarmDescriptor)

# Test case that makes use of client fixture to test server's GetAlarmResponseStream method
def test_get_alarm_response_stream(monitoring_client):
    LOGGER.warning('test_get_alarm_descriptor')
    alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor())
    response = monitoring_client.GetAlarmResponseStream(alarm_subscription(alarm_id))
    assert isinstance(response, _MultiThreadedRendezvous)
    for item in response:
        LOGGER.debug(response)
    _kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
    _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id))
    _response = monitoring_client.GetAlarmResponseStream(alarm_subscription(_alarm_id))
    assert isinstance(_response, _MultiThreadedRendezvous)
    for item in _response:
        LOGGER.debug(_response)
        assert isinstance(item,AlarmResponse)

# Test case that makes use of client fixture to test server's DeleteAlarm method
def test_delete_alarm(monitoring_client):
    LOGGER.warning('test_delete_alarm')
    alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor())
    response = monitoring_client.DeleteAlarm(alarm_id)
    LOGGER.debug(type(response))
    assert isinstance(response, Empty)
    _kpi_id = monitoring_client.SetKpi(create_kpi_request_c())
    _alarm_id = monitoring_client.SetKpiAlarm(alarm_descriptor(_kpi_id))
    _response = monitoring_client.DeleteAlarm(_alarm_id)
    LOGGER.debug(type(_response))
    assert isinstance(_response, Empty)

# Test case that makes use of client fixture to test server's GetStreamKpi method
def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-name
@@ -440,14 +444,116 @@ def test_managementdb_tools_delete_kpi(management_db): # pylint: disable=redefin

    assert response

def test_managementdb_tools_insert_alarm(management_db):
    LOGGER.warning('test_managementdb_tools_insert_alarm begin')

    _alarm_description  = "Alarm Description"
    _alarm_name         = "Alarm Name"
    _kpi_id             = "3"
    _kpi_min_value      = 0.0
    _kpi_max_value      = 250.0
    _in_range           = True
    _include_min_value  = False
    _include_max_value  = True

    _alarm_id = management_db.insert_alarm(_alarm_description, _alarm_name, _kpi_id, _kpi_min_value,
                                               _kpi_max_value,
                                               _in_range, _include_min_value, _include_max_value)
    LOGGER.debug(_alarm_id)
    assert isinstance(_alarm_id,int)

def test_metrics_db_tools_write_kpi(metrics_db): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_metric_sdb_tools_write_kpi begin')

    for i in range(200):
        kpiSampleType = KpiSampleType.Name(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED).upper().replace('KPISAMPLETYPE_', '')
        kpiId = "3"
        deviceId = 'DEV3'
        endpointId = 'END3'
        serviceId = 'SERV3'
        sliceId = 'SLC3'
        connectionId = 'CON3'
        time_stamp = timestamp_utcnow_to_float()
        kpi_value = 500*random()

        metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId,
                                  kpi_value)
        sleep(0.05)

def test_metrics_db_tools_read_kpi_points(metrics_db): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_metrics_db_tools_read_kpi_points begin')


def test_subscription_manager_create_subscription(management_db,metrics_db,subs_scheduler):
    LOGGER.warning('test_subscription_manager_create_subscription begin')
    subs_queue = Queue()

    subs_manager = SubscriptionManager(metrics_db)

    subs_scheduler.add_job(ingestion_data)

    kpi_id = "3"
    sampling_duration_s = 20
    sampling_interval_s = 3
    real_start_time     = timestamp_utcnow_to_float()
    start_timestamp     = real_start_time
    end_timestamp       = start_timestamp + sampling_duration_s

    subs_id = management_db.insert_subscription(kpi_id, "localhost", sampling_duration_s,
                                               sampling_interval_s,start_timestamp,end_timestamp)
    subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s,
                                     sampling_duration_s,start_timestamp,end_timestamp)

    # This is here to simulate application activity (which keeps the main thread alive).
    total_points = 0
    while True:
        while not subs_queue.empty():
            list = subs_queue.get_nowait()
            kpi_list = KpiList()
            for item in list:
                kpi = Kpi()
                kpi.kpi_id.kpi_id.uuid = item[0]
                kpi.timestamp.timestamp = timestamp_string_to_float(item[1])
                kpi.kpi_value.floatVal = item[2]
                kpi_list.kpi.append(kpi)
                total_points += 1
            LOGGER.debug(kpi_list)
        if timestamp_utcnow_to_float() > end_timestamp:
            break

    assert total_points != 0

def test_alarm_manager_create_alarm(management_db,metrics_db):
    LOGGER.warning('test_alarm_manager_create_alarm begin')

    _alarm_description  = "Alarm Description"
    _alarm_name         = "Alarm Name"
    _kpi_id             = "3"
    _kpi_min_value      = 0.0
    _kpi_max_value      = 250.0
    _in_range           = True
    _include_min_value  = False
    _include_max_value  = True
    _subscription_frequency_ms = 10

    _alarm_id = management_db.insert_alarm(_alarm_description, _alarm_name, _kpi_id, _kpi_min_value,
                                               _kpi_max_value,
                                               _in_range, _include_min_value, _include_max_value)

    LOGGER.debug(f"alarm_id: {_alarm_id}")
    _alarm_queue = Queue()
    _alarm_manager = AlarmManager(metrics_db)

    _alarm_manager.create_alarm(_alarm_queue,str(_alarm_id),_kpi_id,_kpi_min_value,_kpi_max_value,_in_range,_include_min_value,_include_max_value,_subscription_frequency_ms)

    LOGGER.debug(_alarm_queue)

    while not _alarm_queue.empty():
        list = _alarm_queue.get_nowait()
        LOGGER.debug(list)
        for item in list:
            LOGGER.debug(item)


def test_events_tools(
        context_client : ContextClient,                 # pylint: disable=redefined-outer-name