Skip to content
Snippets Groups Projects
Commit 58408ee6 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Some improvements to Kpi Manager test and messages file.

- comment is added in Kpi DB file for future reference.
parent 45f1addf
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!255Resolve "KPI Management Enhancements"
......@@ -34,14 +34,15 @@ class KpiDB:
def create_database(self) -> None:
if not sqlalchemy_utils.database_exists(self.db_engine.url):
LOGGER.debug("Database created. {:}".format(self.db_engine.url))
sqlalchemy_utils.create_database(self.db_engine.url)
LOGGER.debug("Database created. {:}".format(self.db_engine.url))
def drop_database(self) -> None:
if sqlalchemy_utils.database_exists(self.db_engine.url):
sqlalchemy_utils.drop_database(self.db_engine.url)
def create_tables(self):
# TODO: use "get_tables(declatrative class obj)" method of "sqlalchemy_utils" to verify tables.
try:
KpiModel.metadata.create_all(self.db_engine) # type: ignore
LOGGER.debug("Tables created in the DB Name: {:}".format(self.db_name))
......
......@@ -21,8 +21,8 @@ LOGGER = logging.getLogger(__name__)
def test_verify_databases_and_Tables():
LOGGER.info('>>> test_verify_Tables : START <<< ')
kpiDBobj = KpiDB()
kpiDBobj.drop_database()
kpiDBobj.verify_tables()
# kpiDBobj.drop_database()
# kpiDBobj.verify_tables()
kpiDBobj.create_database()
kpiDBobj.create_tables()
kpiDBobj.verify_tables()
......@@ -17,7 +17,7 @@ import os, pytest
import logging
from typing import Union
#from common.proto.context_pb2 import Empty
from common.proto.context_pb2 import Empty
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
......@@ -26,12 +26,6 @@ from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server
from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList
from common.tools.service.GenericGrpcService import GenericGrpcService
#from context.client.ContextClient import ContextClient
# from device.service.driver_api.DriverFactory import DriverFactory
# from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
# from device.service.DeviceService import DeviceService
# from device.client.DeviceClient import DeviceClient
from kpi_manager.tests.test_messages import create_kpi_descriptor_request, create_kpi_filter_request, create_kpi_descriptor_request_a
from kpi_manager.service.KpiManagerService import KpiManagerService
......@@ -39,12 +33,6 @@ from kpi_manager.client.KpiManagerClient import KpiManagerClient
from kpi_manager.tests.test_messages import create_kpi_descriptor_request
from kpi_manager.tests.test_messages import create_kpi_id_request
#from monitoring.service.NameMapping import NameMapping
#os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE'
#from device.service.drivers import DRIVERS
###########################
# Tests Setup
###########################
......@@ -55,8 +43,6 @@ KPIMANAGER_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIMANAGER) # t
os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(KPIMANAGER_SERVICE_PORT)
# METRICSDB_HOSTNAME = os.environ.get('METRICSDB_HOSTNAME'){}
LOGGER = logging.getLogger(__name__)
class MockContextService(GenericGrpcService):
......@@ -70,84 +56,10 @@ class MockContextService(GenericGrpcService):
self.context_servicer = MockServicerImpl_Context()
add_ContextServiceServicer_to_server(self.context_servicer, self.server)
# @pytest.fixture(scope='session')
# def context_service():
# LOGGER.info('Initializing MockContextService...')
# _service = MockContextService(MOCKSERVICE_PORT)
# _service.start()
# LOGGER.info('Yielding MockContextService...')
# yield _service
# LOGGER.info('Terminating MockContextService...')
# _service.context_servicer.msg_broker.terminate()
# _service.stop()
# LOGGER.info('Terminated MockContextService...')
# @pytest.fixture(scope='session')
# def context_client(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
# LOGGER.info('Initializing ContextClient...')
# _client = ContextClient()
# LOGGER.info('Yielding ContextClient...')
# yield _client
# LOGGER.info('Closing ContextClient...')
# _client.close()
# LOGGER.info('Closed ContextClient...')
# @pytest.fixture(scope='session')
# def device_service(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
# LOGGER.info('Initializing DeviceService...')
# driver_factory = DriverFactory(DRIVERS)
# driver_instance_cache = DriverInstanceCache(driver_factory)
# _service = DeviceService(driver_instance_cache)
# _service.start()
# # yield the server, when test finishes, execution will resume to stop it
# LOGGER.info('Yielding DeviceService...')
# yield _service
# LOGGER.info('Terminating DeviceService...')
# _service.stop()
# LOGGER.info('Terminated DeviceService...')
# @pytest.fixture(scope='session')
# def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument
# LOGGER.info('Initializing DeviceClient...')
# _client = DeviceClient()
# LOGGER.info('Yielding DeviceClient...')
# yield _client
# LOGGER.info('Closing DeviceClient...')
# _client.close()
# LOGGER.info('Closed DeviceClient...')
# @pytest.fixture(scope='session')
# def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument
# LOGGER.info('Initializing DeviceClient...')
# _client = DeviceClient()
# LOGGER.info('Yielding DeviceClient...')
# yield _client
# LOGGER.info('Closing DeviceClient...')
# _client.close()
# LOGGER.info('Closed DeviceClient...')
# This fixture will be requested by test cases and last during testing session
@pytest.fixture(scope='session')
def kpi_manager_service():
LOGGER.info('Initializing KpiManagerService...')
#name_mapping = NameMapping()
# _service = MonitoringService(name_mapping)
# _service = KpiManagerService(name_mapping)
_service = KpiManagerService()
_service.start()
......@@ -194,22 +106,22 @@ def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disab
###########################
# ---------- 3rd Iteration Tests ----------------
# def test_SetKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ")
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiId)
def test_SetKpiDescriptor(kpi_manager_client):
LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ")
response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
LOGGER.info("Response gRPC message object: {:}".format(response))
assert isinstance(response, KpiId)
# def test_DeleteKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ")
# # adding KPI
# response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# # deleting KPI
# del_response = kpi_manager_client.DeleteKpiDescriptor(response_id)
# # select KPI
# kpi_manager_client.GetKpiDescriptor(response_id)
# LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response))
# assert isinstance(del_response, Empty)
def test_DeleteKpiDescriptor(kpi_manager_client):
LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ")
# adding KPI
response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# deleting KPI
del_response = kpi_manager_client.DeleteKpiDescriptor(response_id)
# select KPI
kpi_manager_client.GetKpiDescriptor(response_id)
LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response))
assert isinstance(del_response, Empty)
def test_GetKpiDescriptor(kpi_manager_client):
LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ")
......@@ -225,21 +137,21 @@ def test_GetKpiDescriptor(kpi_manager_client):
assert isinstance(response, KpiDescriptor)
# def test_SelectKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ")
# # adding KPI
# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# # select KPI(s)
# response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request())
# LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiDescriptorList)
def test_SelectKpiDescriptor(kpi_manager_client):
LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ")
# adding KPI
kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# select KPI(s)
response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request())
LOGGER.info("Response gRPC message object: {:}".format(response))
assert isinstance(response, KpiDescriptorList)
# def test_set_list_of_KPIs(kpi_manager_client):
# LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
# KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"]
# # adding KPI
# for kpi in KPIs_TO_SEARCH:
# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi))
def test_set_list_of_KPIs(kpi_manager_client):
LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"]
# adding KPI
for kpi in KPIs_TO_SEARCH:
kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi))
# ---------- 2nd Iteration Tests -----------------
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment