Commit 4c2f3c9d authored by Waleed Akbar's avatar Waleed Akbar
Browse files

some changes for testing

parent 258f2dbd
Loading
Loading
Loading
Loading
+18 −2
Original line number Diff line number Diff line
@@ -27,6 +27,9 @@ from kpi_manager.database.KpiModel import Kpi as KpiModel
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('KpiManager', 'NBIgRPC')

class IDNotFoundError(Exception):
    ...

class KpiManagerServiceServicerImpl(KpiManagerServiceServicer):
    def __init__(self, name_mapping : NameMapping):
        LOGGER.debug('Init KpiManagerService')
@@ -58,9 +61,22 @@ class KpiManagerServiceServicerImpl(KpiManagerServiceServicer):
            if row is not None:
                response = KpiModel.convert_row_to_KpiDescriptor(row)
                return response
            if row is None:
                print ('No matching row found for kpi id: {:}'.format(kpi_id_to_search))
                LOGGER.debug('No matching row found kpi id: {:}'.format(kpi_id_to_search))
                return Empty()
        except Exception as e:
            print ('Unable to search kpi id. {:}'.format(e))
            LOGGER.debug('Unable to search kpi id. {:}'.format(e))
            raise e
        # kpi_id_to_search = request.kpi_id.uuid
        # row = self.kpi_db_obj.search_db_row_by_id(KpiModel, 'kpi_id', kpi_id_to_search)
        # if row is None:
        #     print ('Unable to search kpi id. {:}'.format(kpi_id_to_search))
        #     LOGGER.debug('Unable to search kpi id. {:}'.format(kpi_id_to_search))
        #     raise IDNotFoundError
        # response = KpiModel.convert_row_to_KpiDescriptor(row)
        # return response
    
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def DeleteKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext # type: ignore
+8 −0
Original line number Diff line number Diff line
@@ -36,6 +36,9 @@ from context.client.ContextClient import ContextClient
from kpi_manager.tests.test_messages import create_kpi_descriptor_request, create_kpi_filter_request, create_kpi_descriptor_request_a
from kpi_manager.service.KpiManagerService import KpiManagerService
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from kpi_manager.tests.test_messages import create_kpi_descriptor_request
from kpi_value_writer.tests.test_messages import create_kpi_id_request


from monitoring.service.NameMapping import NameMapping

@@ -214,6 +217,11 @@ def test_GetKpiDescriptor(kpi_manager_client):
    # get KPI
    response = kpi_manager_client.GetKpiDescriptor(response_id)
    LOGGER.info("Response gRPC message object: {:}".format(response))

    LOGGER.info(" >>> calling GetKpiDescriptor with random ID")
    rand_response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request())
    LOGGER.info("Response gRPC message object: {:}".format(rand_response))

    assert isinstance(response, KpiDescriptor)

# def test_SelectKpiDescriptor(kpi_manager_client):
+24 −6
Original line number Diff line number Diff line
@@ -15,10 +15,30 @@
import logging
from kpi_value_writer.service.KpiValueWriter import KpiValueWriter
from common.tools.kafka.Variables import KafkaTopic

from kpi_manager.client.KpiManagerClient import KpiManagerClient
from kpi_manager.tests.test_messages import create_kpi_descriptor_request
from common.proto.kpi_manager_pb2 import KpiDescriptor
from kpi_value_writer.tests.test_messages import create_kpi_id_request

LOGGER = logging.getLogger(__name__)

def test_GetKpiDescriptor():
    LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ")
    kpi_manager_client = KpiManagerClient()
    # adding KPI
    LOGGER.info(" >>> calling SetKpiDescriptor ")
    response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
    # get KPI
    LOGGER.info(" >>> calling GetKpiDescriptor with response ID")
    response = kpi_manager_client.GetKpiDescriptor(response_id)
    LOGGER.info("Response gRPC message object: {:}".format(response))
    
    LOGGER.info(" >>> calling GetKpiDescriptor with random ID")
    rand_response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request())
    LOGGER.info("Response gRPC message object: {:}".format(rand_response))

    LOGGER.info("\n------------------ TEST FINISHED ---------------------\n")
    assert isinstance(response, KpiDescriptor)

# -------- Initial Test ----------------
# def test_validate_kafka_topics():
@@ -26,9 +46,7 @@ LOGGER = logging.getLogger(__name__)
#     response = KafkaTopic.create_all_topics()
#     assert isinstance(response, bool)

def test_KafkaConsumer():
    LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ")
    KpiValueWriter.RunKafkaConsumer()
# def test_KafkaConsumer():
#     LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ")
#     KpiValueWriter.RunKafkaConsumer()
# def test_metric_composer_and_writer():
#     LOGGER.debug(" --->>> test_metric_composer_and_writer: START <<<--- ")
+8 −3
Original line number Diff line number Diff line
@@ -18,6 +18,11 @@ from common.proto import kpi_manager_pb2
from common.proto.kpi_value_api_pb2 import KpiValue
from common.proto.kpi_sample_types_pb2 import KpiSampleType

def create_kpi_id_request():
    _create_kpi_id = kpi_manager_pb2.KpiId()
    _create_kpi_id.kpi_id.uuid = str(uuid.uuid4())
    return _create_kpi_id


def create_kpi_descriptor_request(description: str = "Test Description"):
    _create_kpi_request                                    = kpi_manager_pb2.KpiDescriptor()