Upcoming maintenance: Thursday 21 August @ 12:00-14:00 CEST.

Skip to content
Snippets Groups Projects
Commit 87ddc7d8 authored by Javi Moreno's avatar Javi Moreno
Browse files

More unit tests added

parent 7014b1ac
No related branches found
No related tags found
1 merge request!54Release 2.0.0
......@@ -28,3 +28,6 @@ class Influx():
points = results.get_points(tags={'kpi_id' : '1','device_id': '1', 'kpi_sample_type': '101'})
for point in points:
print("Time: %s, Value: %i" % (point['time'], point['kpi_value']))
return points
......@@ -54,6 +54,7 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService
kpi_service_id = request.service_id.service_uuid.uuid
data = self.sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
kpi_id.kpi_id.uuid = str(data)
# CREATEKPI_COUNTER_COMPLETED.inc()
......
......@@ -54,4 +54,5 @@ class SQLite():
def get_KPIS(self):
data = self.client.execute("SELECT * FROM KPI")
for row in data:
print(row)
\ No newline at end of file
print(row)
return data.fetchall()
\ No newline at end of file
import logging, grpc
import os
import sqlite3
import pytest
from typing import Tuple
......@@ -6,6 +9,7 @@ from monitoring.proto import context_pb2, kpi_sample_types_pb2
from monitoring.proto import monitoring_pb2
from monitoring.client.monitoring_client import MonitoringClient
from monitoring.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
from monitoring.service import SqliteTools, InfluxTools
from monitoring.service.MonitoringService import MonitoringService
from monitoring.service.EventTools import EventsDeviceCollector
......@@ -38,6 +42,11 @@ SCENARIOS = [ # comment/uncomment scenarios to activate/deactivate them in the t
('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ),
]
INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME")
INFLUXDB_USER = os.environ.get("INFLUXDB_USER")
INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD")
INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE")
@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS)
def context_db_mb(request) -> Tuple[Database, MessageBroker]:
name,db_backend,db_settings,mb_backend,mb_settings = request.param
......@@ -115,6 +124,16 @@ def kpi_id():
return kpi_id
@pytest.fixture(scope='session')
def sql_db():
sql_db = SqliteTools.SQLite('monitoring.db')
return sql_db
@pytest.fixture(scope='session')
def influx_db():
influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME, "8086", INFLUXDB_USER, INFLUXDB_PASSWORD, INFLUXDB_DATABASE)
return influx_db
@pytest.fixture(scope='session')
def create_kpi_request():
LOGGER.warning('test_include_kpi begin')
......@@ -200,11 +219,115 @@ def test_get_kpidescritor_kpi(monitoring_client,kpi_id):
LOGGER.debug(str(response))
assert isinstance(response, monitoring_pb2.KpiDescriptor)
def test_sqlitedb_tools_insert_kpi(sql_db, create_kpi_request):
LOGGER.warning('test_sqlitedb_tools_insert_kpi begin')
kpi_description = create_kpi_request.kpi_description
kpi_sample_type = create_kpi_request.kpi_sample_type
kpi_device_id = create_kpi_request.device_id.device_uuid.uuid
kpi_endpoint_id = create_kpi_request.endpoint_id.endpoint_uuid.uuid
kpi_service_id = create_kpi_request.service_id.service_uuid.uuid
response = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
assert isinstance(response, int)
def test_sqlitedb_tools_get_kpi(sql_db, create_kpi_request):
LOGGER.warning('test_sqlitedb_tools_get_kpi begin')
kpi_description = create_kpi_request.kpi_description
kpi_sample_type = create_kpi_request.kpi_sample_type
kpi_device_id = create_kpi_request.device_id.device_uuid.uuid
kpi_endpoint_id = create_kpi_request.endpoint_id.endpoint_uuid.uuid
kpi_service_id = create_kpi_request.service_id.service_uuid.uuid
kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
response = sql_db.get_KPI(kpi_id)
assert isinstance(response, tuple)
def test_sqlitedb_tools_get_kpis(sql_db):
LOGGER.warning('test_sqlitedb_tools_get_kpis begin')
response = sql_db.get_KPIS()
assert isinstance(response, list)
def test_sqlitedb_tools_delete_kpi(sql_db, create_kpi_request):
LOGGER.warning('test_sqlitedb_tools_get_kpi begin')
response = sql_db.delete_KPI("DEV1",kpi_sample_types_pb2.KpiSampleType.PACKETS_TRANSMITTED)
if response == False:
kpi_description = create_kpi_request.kpi_description
kpi_sample_type = create_kpi_request.kpi_sample_type
kpi_device_id = create_kpi_request.device_id.device_uuid.uuid
kpi_endpoint_id = create_kpi_request.endpoint_id.endpoint_uuid.uuid
kpi_service_id = create_kpi_request.service_id.service_uuid.uuid
sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
response = sql_db.delete_KPI("DEV1", kpi_sample_types_pb2.KpiSampleType.PACKETS_TRANSMITTED)
assert response == True
def test_sqlitedb_tools_delete_kpid_id(sql_db, create_kpi_request):
LOGGER.warning('test_sqlitedb_tools_delete_kpid_id begin')
response = sql_db.delete_kpid_id(1)
if response == False:
kpi_description = create_kpi_request.kpi_description
kpi_sample_type = create_kpi_request.kpi_sample_type
kpi_device_id = create_kpi_request.device_id.device_uuid.uuid
kpi_endpoint_id = create_kpi_request.endpoint_id.endpoint_uuid.uuid
kpi_service_id = create_kpi_request.service_id.service_uuid.uuid
kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
response = sql_db.delete_kpid_id(kpi_id)
assert response == True
def test_influxdb_tools_write_kpi(influx_db):
LOGGER.warning('test_influxdb_tools_write_kpi begin')
def test_influxdb_tools_read_kpi_points(influx_db):
LOGGER.warning('test_influxdb_tools_read_kpi_points begin')
def test_events_tools(context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name
monitoring_client : MonitoringClient,
context_db_mb: Tuple[Database, MessageBroker]):
LOGGER.warning('test_get_device_events begin')
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
context_database.clear_all()
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
events_collector = EventsDeviceCollector(context_client_grpc, monitoring_client)
events_collector.start()
# # ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
populate('localhost', GRPC_PORT_CONTEXT) # place this call in the appropriate line, according to your tests
# ----- Update the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetDevice(Device(**DEVICE1))
assert response.device_uuid.uuid == DEVICE1_UUID
events_collector.stop()
def test_get_device_events(context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name
monitoring_client : MonitoringClient,
context_db_mb: Tuple[Database, MessageBroker]):
LOGGER.warning('test_getkpidescritor_kpi begin')
LOGGER.warning('test_get_device_events begin')
context_database = context_db_mb[0]
......@@ -232,10 +355,6 @@ def test_get_device_events(context_client_grpc: ContextClient, # pylint: disabl
assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert event.device_id.device_uuid.uuid == DEVICE1_UUID
# ----- Update the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetDevice(Device(**DEVICE1))
assert response.device_uuid.uuid == DEVICE1_UUID
events_collector.stop()
def test_listen_events(monitoring_client: MonitoringClient,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment