diff --git a/deploy/all.sh b/deploy/all.sh index f93cd92ac5e3189b0dc8fa71d74a586e929aaecc..e9b33b469b7cad1547ab0dcb63e326672f51971e 100755 --- a/deploy/all.sh +++ b/deploy/all.sh @@ -215,6 +215,9 @@ export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"} # Deploy QuestDB ./deploy/qdb.sh +# Deploy Apache Kafka +./deploy/kafka.sh + # Expose Dashboard ./deploy/expose_dashboard.sh diff --git a/deploy/kafka.sh b/deploy/kafka.sh index 4a91bfc9e657d1b8a6a548b9c0a81a2f8a0b45e0..21ba89408e6bdadd92a2a96c59d6d24cd580952e 100755 --- a/deploy/kafka.sh +++ b/deploy/kafka.sh @@ -20,50 +20,69 @@ # If not already set, set the namespace where Apache Kafka will be deployed. export KFK_NAMESPACE=${KFK_NAMESPACE:-"kafka"} +# If not already set, set the port Apache Kafka server will be exposed to. +export KFK_SERVER_PORT=${KFK_SERVER_PORT:-"9092"} + +# If not already set, if flag is YES, Apache Kafka will be redeployed and all topics will be lost. +export KFK_REDEPLOY=${KFK_REDEPLOY:-""} + ######################################################################################################################## # Automated steps start here ######################################################################################################################## -# Constants -TMP_FOLDER="./tmp" -KFK_MANIFESTS_PATH="manifests/kafka" -KFK_ZOOKEEPER_MANIFEST="01-zookeeper.yaml" -KFK_MANIFEST="02-kafka.yaml" + # Constants + TMP_FOLDER="./tmp" + KFK_MANIFESTS_PATH="manifests/kafka" + KFK_ZOOKEEPER_MANIFEST="01-zookeeper.yaml" + KFK_MANIFEST="02-kafka.yaml" + + # Create a tmp folder for files modified during the deployment + TMP_MANIFESTS_FOLDER="${TMP_FOLDER}/${KFK_NAMESPACE}/manifests" + mkdir -p ${TMP_MANIFESTS_FOLDER} -# Create a tmp folder for files modified during the deployment -TMP_MANIFESTS_FOLDER="${TMP_FOLDER}/${KFK_NAMESPACE}/manifests" -mkdir -p ${TMP_MANIFESTS_FOLDER} +function kafka_deploy() { + # copy zookeeper and kafka manifest files to temporary manifest location + cp "${KFK_MANIFESTS_PATH}/${KFK_ZOOKEEPER_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}" + cp "${KFK_MANIFESTS_PATH}/${KFK_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_MANIFEST}" -# copy zookeeper and kafka manifest files to temporary manifest location -cp "${KFK_MANIFESTS_PATH}/${KFK_ZOOKEEPER_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}" -cp "${KFK_MANIFESTS_PATH}/${KFK_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_MANIFEST}" + # echo "Apache Kafka Namespace" + echo ">>> Delete Apache Kafka Namespace" + kubectl delete namespace ${KFK_NAMESPACE} --ignore-not-found -echo "Apache Kafka Namespace" -echo ">>> Delete Apache Kafka Namespace" -kubectl delete namespace ${KFK_NAMESPACE} --ignore-not-found + echo ">>> Create Apache Kafka Namespace" + kubectl create namespace ${KFK_NAMESPACE} -echo ">>> Create Apache Kafka Namespace" -kubectl create namespace ${KFK_NAMESPACE} + # echo ">>> Deplying Apache Kafka Zookeeper" + # Kafka zookeeper service should be deployed before the kafka service + kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}" -echo ">>> Deplying Apache Kafka Zookeeper" -# Kafka zookeeper service should be deployed before the kafka service -kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}" + KFK_ZOOKEEPER_SERVICE="zookeeper-service" # this command may be replaced with command to extract service name automatically + KFK_ZOOKEEPER_IP=$(kubectl --namespace ${KFK_NAMESPACE} get service ${KFK_ZOOKEEPER_SERVICE} -o 'jsonpath={.spec.clusterIP}') -KFK_ZOOKEEPER_SERVICE="zookeeper-service" # this command may be replaced with command to extract service name automatically -KFK_ZOOKEEPER_IP=$(kubectl --namespace ${KFK_NAMESPACE} get service ${KFK_ZOOKEEPER_SERVICE} -o 'jsonpath={.spec.clusterIP}') + # Kafka service should be deployed after the zookeeper service + sed -i "s//${KFK_ZOOKEEPER_IP}/" "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST" -# Kafka service should be deployed after the zookeeper service -sed -i "s//${KFK_ZOOKEEPER_IP}/" "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST" + # echo ">>> Deploying Apache Kafka Broker" + kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST" -echo ">>> Deploying Apache Kafka Broker" -kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST" + # echo ">>> Verifing Apache Kafka deployment" + sleep 5 + # KFK_PODS_STATUS=$(kubectl --namespace ${KFK_NAMESPACE} get pods) + # if echo "$KFK_PODS_STATUS" | grep -qEv 'STATUS|Running'; then + # echo "Deployment Error: \n $KFK_PODS_STATUS" + # else + # echo "$KFK_PODS_STATUS" + # fi +} -echo ">>> Verifing Apache Kafka deployment" -sleep 10 -KFK_PODS_STATUS=$(kubectl --namespace ${KFK_NAMESPACE} get pods) -if echo "$KFK_PODS_STATUS" | grep -qEv 'STATUS|Running'; then - echo "Deployment Error: \n $KFK_PODS_STATUS" +echo "Apache Kafka" +echo ">>> Checking if Apache Kafka is deployed ... " +if [ "$KFK_REDEPLOY" == "YES" ]; then + kafka_deploy +elif kubectl get --namespace ${KFK_NAMESPACE} deployments.apps &> /dev/null; then + echo ">>> Apache Kafka already present; skipping step." else - echo "$KFK_PODS_STATUS" -fi \ No newline at end of file + kafka_deploy +fi +echo diff --git a/deploy/tfs.sh b/deploy/tfs.sh index 62f36a2c138c99b1ee666c8c5397083266ad699d..4ecfaae9972ec136b2b7e74c6071392ba288f0d3 100755 --- a/deploy/tfs.sh +++ b/deploy/tfs.sh @@ -115,6 +115,17 @@ export PROM_EXT_PORT_HTTP=${PROM_EXT_PORT_HTTP:-"9090"} export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"} +# ----- Apache Kafka ------------------------------------------------------ + +# If not already set, set the namespace where Apache Kafka will be deployed. +export KFK_NAMESPACE=${KFK_NAMESPACE:-"kafka"} + +# If not already set, set the port Apache Kafka server will be exposed to. +export KFK_SERVER_PORT=${KFK_SERVER_PORT:-"9092"} + +# If not already set, if flag is YES, Apache Kafka will be redeployed and topic will be lost. +export KFK_REDEPLOY=${KFK_REDEPLOY:-""} + ######################################################################################################################## # Automated steps start here ######################################################################################################################## @@ -147,7 +158,7 @@ kubectl create secret generic crdb-data --namespace ${TFS_K8S_NAMESPACE} --type= --from-literal=CRDB_SSLMODE=require printf "\n" -echo "Create secret with CockroachDB data for KPI Management" +echo "Create secret with CockroachDB data for KPI Management microservices" CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}') CRDB_DATABASE_KPI_MGMT="tfs_kpi_mgmt" # TODO: change by specific configurable environment variable kubectl create secret generic crdb-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ @@ -159,6 +170,13 @@ kubectl create secret generic crdb-kpi-data --namespace ${TFS_K8S_NAMESPACE} --t --from-literal=CRDB_SSLMODE=require printf "\n" +echo "Create secret with Apache Kafka kfk-kpi-data for KPI and Telemetry microservices" +KFK_SERVER_PORT=$(kubectl --namespace ${KFK_NAMESPACE} get service kafka-service -o 'jsonpath={.spec.ports[0].port}') +kubectl create secret generic kfk-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ + --from-literal=KFK_NAMESPACE=${KFK_NAMESPACE} \ + --from-literal=KFK_SERVER_PORT=${KFK_SERVER_PORT} +printf "\n" + echo "Create secret with NATS data" NATS_CLIENT_PORT=$(kubectl --namespace ${NATS_NAMESPACE} get service ${NATS_NAMESPACE} -o 'jsonpath={.spec.ports[?(@.name=="client")].port}') if [ -z "$NATS_CLIENT_PORT" ]; then diff --git a/manifests/kpi_value_apiservice.yaml b/manifests/kpi_value_apiservice.yaml index 74eb90f675794f1b451b04af55e191edff58fae5..e4dcb00545ffaa33de39fd29c029780b777ea91f 100644 --- a/manifests/kpi_value_apiservice.yaml +++ b/manifests/kpi_value_apiservice.yaml @@ -39,6 +39,9 @@ spec: env: - name: LOG_LEVEL value: "INFO" + envFrom: + - secretRef: + name: kfk-kpi-data readinessProbe: exec: command: ["/bin/grpc_health_probe", "-addr=:30020"] diff --git a/manifests/kpi_value_writerservice.yaml b/manifests/kpi_value_writerservice.yaml index 8a8e44ec2a571f1290e30a08d1c896a6339cbe46..e21e36f48ba08999f142e8548fed61cd2dfef0cc 100644 --- a/manifests/kpi_value_writerservice.yaml +++ b/manifests/kpi_value_writerservice.yaml @@ -39,6 +39,9 @@ spec: env: - name: LOG_LEVEL value: "INFO" + envFrom: + - secretRef: + name: kfk-kpi-data readinessProbe: exec: command: ["/bin/grpc_health_probe", "-addr=:30030"] diff --git a/my_deploy.sh b/my_deploy.sh index b89df7481ebd17edf2b966eb818598d1a04a596f..45e0d1301b2807ae7393d45366f849c69285b909 100755 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -181,3 +181,8 @@ export GRAF_EXT_PORT_HTTP="3000" # Set the namespace where Apache Kafka will be deployed. export KFK_NAMESPACE="kafka" +# Set the port Apache Kafka server will be exposed to. +export KFK_SERVER_PORT="9092" + +# Set the flag to YES for redeploying of Apache Kafka +export KFK_REDEPLOY="" diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index 24ae2cff7b5e710e18999eb09029216a4a5d6c8a..168957a26c4f1947d65d7c849c25ed7e9dad06be 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -16,14 +16,20 @@ import logging from enum import Enum from confluent_kafka import KafkaException from confluent_kafka.admin import AdminClient, NewTopic +from common.Settings import get_setting LOGGER = logging.getLogger(__name__) class KafkaConfig(Enum): - # SERVER_IP = "127.0.0.1:9092" - SERVER_IP = "kafka-service.kafka.svc.cluster.local:9092" - ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_IP}) + KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}' + KFK_NAMESPACE = 'kafka' + # KFK_NAMESPACE = get_setting('KFK_NAMESPACE') + KFK_PORT = '9092' + # KFK_PORT = get_setting('KFK_SERVER_PORT') + # SERVER_ADDRESS = "127.0.0.1:9092" + SERVER_ADDRESS = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) + ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS }) class KafkaTopic(Enum): REQUEST = 'topic_request' @@ -38,6 +44,7 @@ class KafkaTopic(Enum): Method to create Kafka topics defined as class members """ all_topics = [member.value for member in KafkaTopic] + LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.SERVER_ADDRESS.value)) if( KafkaTopic.create_new_topic_if_not_exists( all_topics )): LOGGER.debug("All topics are created sucsessfully") return True diff --git a/src/kpi_manager/README.md b/src/kpi_manager/README.md index c1feadcc4843db26a219d1e3b37833ddd80b18dc..6e9b56d9349aa6acd5c41004e32c933619a37f65 100644 --- a/src/kpi_manager/README.md +++ b/src/kpi_manager/README.md @@ -1,29 +1,24 @@ # How to locally run and test KPI manager micro-service -## --- File links need to be updated. --- ### Pre-requisets -The following requirements should be fulfilled before the execuation of KPI management service. +Ensure the following requirements are met before executing the KPI management service: -1. Verify that [kpi_management.proto](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/proto/kpi_management.proto) file exists and grpcs file are generated sucessfully. -2. Virtual enviornment exist with all the required packages listed in ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/requirements.in) are installed sucessfully. -3. Verify the creation of required database and table. -[KPI DB test](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/database/tests/KpiDBtests.py) python file enlist the functions to create tables and database and -[KPI Engine](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/service/database/KpiEngine.py) contains the DB string, update the string as per your deployment. +1. A virtual enviornment exist with all the required packages listed in ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_manager/requirements.in) sucessfully installed. +2. Verify the creation of required database and table. The +[KPI DB test](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_manager/tests/test_kpi_db.py) python file lists the functions to create tables and the database. The +[KPI Engine](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_manager/database/KpiEngine.py) file contains the DB string. ### Messages format templates -["Messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/tests/test_messages.py) python file enlist the basic gRPC messages format used during the testing. +The ["messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_manager/tests/test_messages.py) python file contains templates for creating gRPC messages. -### Test file -["KPI management test"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/tests/test_kpi_manager.py) python file enlist different tests conducted during the experiment. +### Unit test file +The ["KPI manager test"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_manager/tests/test_kpi_manager.py) python file lists various tests conducted to validate functionality. ### Flow of execution (Kpi Maanager Service functions) 1. Call the `create_database()` and `create_tables()` functions from `Kpi_DB` class to create the required database and table if they don't exist. Call `verify_tables` to verify the existence of KPI table. -2. Call the gRPC method `SetKpiDescriptor(KpiDescriptor)->KpiId` to add the KpiDescriptor in `Kpi` DB. `KpiDescriptor` and `KpiId` are both pre-defined gRPC message types. +2. Call the gRPC method `SetKpiDescriptor(KpiDescriptor)->KpiId` to add the KpiDescriptor to the `Kpi` DB. `KpiDescriptor` and `KpiId` are both pre-defined gRPC message types. -3. Call `GetKpiDescriptor(KpiId)->KpiDescriptor` to read the `KpiDescriptor` from DB and `DeleteKpiDescriptor(KpiId)` to delete the `KpiDescriptor` from DB. +3. Call `GetKpiDescriptor(KpiId)->KpiDescriptor` to read the `KpiDescriptor` from the DB and `DeleteKpiDescriptor(KpiId)` to delete the `KpiDescriptor` from the DB. -4. Call `SelectKpiDescriptor(KpiDescriptorFilter)->KpiDescriptorList` to get all `KpiDescriptor` objects that matches the filter criteria. `KpiDescriptorFilter` and `KpiDescriptorList` are pre-defined gRPC message types. - -## For KPI composer and KPI writer -The functionalities of KPI composer and writer is heavily dependent upon Telemetery service. Therfore, these services has other pre-requsites that are mention [here](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/telemetry/requirements.in). +4. Call `SelectKpiDescriptor(KpiDescriptorFilter)->KpiDescriptorList` to get all `KpiDescriptor` objects that matches filter criteria. `KpiDescriptorFilter` and `KpiDescriptorList` are pre-defined gRPC message types. diff --git a/src/kpi_manager/database/Kpi_DB.py b/src/kpi_manager/database/Kpi_DB.py index 4b60640707c8d0c2ce90e5ab135ddf6fd4c91f63..49ad9c9b579daa918818366a1d9505089968edc2 100644 --- a/src/kpi_manager/database/Kpi_DB.py +++ b/src/kpi_manager/database/Kpi_DB.py @@ -34,14 +34,15 @@ class KpiDB: def create_database(self) -> None: if not sqlalchemy_utils.database_exists(self.db_engine.url): - LOGGER.debug("Database created. {:}".format(self.db_engine.url)) sqlalchemy_utils.create_database(self.db_engine.url) + LOGGER.debug("Database created. {:}".format(self.db_engine.url)) def drop_database(self) -> None: if sqlalchemy_utils.database_exists(self.db_engine.url): sqlalchemy_utils.drop_database(self.db_engine.url) def create_tables(self): + # TODO: use "get_tables(declatrative class obj)" method of "sqlalchemy_utils" to verify tables. try: KpiModel.metadata.create_all(self.db_engine) # type: ignore LOGGER.debug("Tables created in the DB Name: {:}".format(self.db_name)) @@ -69,8 +70,7 @@ class KpiDB: session.rollback() if "psycopg2.errors.UniqueViolation" in str(e): LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}") - raise AlreadyExistsException(row.__class__.__name__, row, - extra_details=["Unique key voilation: {:}".format(e)] ) + raise AlreadyExistsException(row.__class__.__name__, row, extra_details=["Unique key voilation: {:}".format(e)] ) else: LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}") raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) @@ -89,7 +89,6 @@ class KpiDB: print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search)) return None except Exception as e: - session.rollback() LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}") raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)]) finally: diff --git a/src/kpi_manager/tests/test_kpi_db.py b/src/kpi_manager/tests/test_kpi_db.py index e961c12bacdbac07f111b229435ed3d89d62581f..d4a57f83664f851504389b3bbe99d5c2a92542d9 100644 --- a/src/kpi_manager/tests/test_kpi_db.py +++ b/src/kpi_manager/tests/test_kpi_db.py @@ -21,8 +21,8 @@ LOGGER = logging.getLogger(__name__) def test_verify_databases_and_Tables(): LOGGER.info('>>> test_verify_Tables : START <<< ') kpiDBobj = KpiDB() - kpiDBobj.drop_database() - kpiDBobj.verify_tables() + # kpiDBobj.drop_database() + # kpiDBobj.verify_tables() kpiDBobj.create_database() kpiDBobj.create_tables() kpiDBobj.verify_tables() diff --git a/src/kpi_manager/tests/test_kpi_manager.py b/src/kpi_manager/tests/test_kpi_manager.py index f0d9526d33694a683b70180eb3bc6de833bf1cfa..219fdadee9e2f4ca9ea9ac0be040043d4edfbdbe 100755 --- a/src/kpi_manager/tests/test_kpi_manager.py +++ b/src/kpi_manager/tests/test_kpi_manager.py @@ -17,7 +17,7 @@ import os, pytest import logging from typing import Union -#from common.proto.context_pb2 import Empty +from common.proto.context_pb2 import Empty from common.Constants import ServiceNameEnum from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc) @@ -26,12 +26,6 @@ from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList from common.tools.service.GenericGrpcService import GenericGrpcService -#from context.client.ContextClient import ContextClient - -# from device.service.driver_api.DriverFactory import DriverFactory -# from device.service.driver_api.DriverInstanceCache import DriverInstanceCache -# from device.service.DeviceService import DeviceService -# from device.client.DeviceClient import DeviceClient from kpi_manager.tests.test_messages import create_kpi_descriptor_request, create_kpi_filter_request, create_kpi_descriptor_request_a from kpi_manager.service.KpiManagerService import KpiManagerService @@ -39,12 +33,6 @@ from kpi_manager.client.KpiManagerClient import KpiManagerClient from kpi_manager.tests.test_messages import create_kpi_descriptor_request from kpi_manager.tests.test_messages import create_kpi_id_request - -#from monitoring.service.NameMapping import NameMapping - -#os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE' -#from device.service.drivers import DRIVERS - ########################### # Tests Setup ########################### @@ -55,8 +43,6 @@ KPIMANAGER_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIMANAGER) # t os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(KPIMANAGER_SERVICE_PORT) -# METRICSDB_HOSTNAME = os.environ.get('METRICSDB_HOSTNAME'){} - LOGGER = logging.getLogger(__name__) class MockContextService(GenericGrpcService): @@ -70,84 +56,10 @@ class MockContextService(GenericGrpcService): self.context_servicer = MockServicerImpl_Context() add_ContextServiceServicer_to_server(self.context_servicer, self.server) -# @pytest.fixture(scope='session') -# def context_service(): -# LOGGER.info('Initializing MockContextService...') -# _service = MockContextService(MOCKSERVICE_PORT) -# _service.start() - -# LOGGER.info('Yielding MockContextService...') -# yield _service - -# LOGGER.info('Terminating MockContextService...') -# _service.context_servicer.msg_broker.terminate() -# _service.stop() - -# LOGGER.info('Terminated MockContextService...') - -# @pytest.fixture(scope='session') -# def context_client(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument -# LOGGER.info('Initializing ContextClient...') -# _client = ContextClient() - -# LOGGER.info('Yielding ContextClient...') -# yield _client - -# LOGGER.info('Closing ContextClient...') -# _client.close() - -# LOGGER.info('Closed ContextClient...') - -# @pytest.fixture(scope='session') -# def device_service(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument -# LOGGER.info('Initializing DeviceService...') -# driver_factory = DriverFactory(DRIVERS) -# driver_instance_cache = DriverInstanceCache(driver_factory) -# _service = DeviceService(driver_instance_cache) -# _service.start() - -# # yield the server, when test finishes, execution will resume to stop it -# LOGGER.info('Yielding DeviceService...') -# yield _service - -# LOGGER.info('Terminating DeviceService...') -# _service.stop() - -# LOGGER.info('Terminated DeviceService...') - -# @pytest.fixture(scope='session') -# def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument -# LOGGER.info('Initializing DeviceClient...') -# _client = DeviceClient() - -# LOGGER.info('Yielding DeviceClient...') -# yield _client - -# LOGGER.info('Closing DeviceClient...') -# _client.close() - -# LOGGER.info('Closed DeviceClient...') - -# @pytest.fixture(scope='session') -# def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument -# LOGGER.info('Initializing DeviceClient...') -# _client = DeviceClient() - -# LOGGER.info('Yielding DeviceClient...') -# yield _client - -# LOGGER.info('Closing DeviceClient...') -# _client.close() - -# LOGGER.info('Closed DeviceClient...') - # This fixture will be requested by test cases and last during testing session @pytest.fixture(scope='session') def kpi_manager_service(): LOGGER.info('Initializing KpiManagerService...') - #name_mapping = NameMapping() - # _service = MonitoringService(name_mapping) - # _service = KpiManagerService(name_mapping) _service = KpiManagerService() _service.start() @@ -181,35 +93,28 @@ def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disab # Prepare Environment, should be the first test ################################################## -# # ERROR on this test --- -# def test_prepare_environment( -# context_client : ContextClient, # pylint: disable=redefined-outer-name,unused-argument -# ): -# context_id = json_context_id(DEFAULT_CONTEXT_NAME) -# context_client.SetContext(Context(**json_context(DEFAULT_CONTEXT_NAME))) -# context_client.SetTopology(Topology(**json_topology(DEFAULT_TOPOLOGY_NAME, context_id=context_id))) ########################### # Tests Implementation of Kpi Manager ########################### # ---------- 3rd Iteration Tests ---------------- -# def test_SetKpiDescriptor(kpi_manager_client): -# LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ") -# response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) -# LOGGER.info("Response gRPC message object: {:}".format(response)) -# assert isinstance(response, KpiId) +def test_SetKpiDescriptor(kpi_manager_client): + LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ") + response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) + LOGGER.info("Response gRPC message object: {:}".format(response)) + assert isinstance(response, KpiId) -# def test_DeleteKpiDescriptor(kpi_manager_client): -# LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ") -# # adding KPI -# response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) -# # deleting KPI -# del_response = kpi_manager_client.DeleteKpiDescriptor(response_id) -# # select KPI -# kpi_manager_client.GetKpiDescriptor(response_id) -# LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response)) -# assert isinstance(del_response, Empty) +def test_DeleteKpiDescriptor(kpi_manager_client): + LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ") + # adding KPI + response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) + # deleting KPI + del_response = kpi_manager_client.DeleteKpiDescriptor(response_id) + # select KPI + kpi_manager_client.GetKpiDescriptor(response_id) + LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response)) + assert isinstance(del_response, Empty) def test_GetKpiDescriptor(kpi_manager_client): LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ") @@ -225,77 +130,18 @@ def test_GetKpiDescriptor(kpi_manager_client): assert isinstance(response, KpiDescriptor) -# def test_SelectKpiDescriptor(kpi_manager_client): -# LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ") -# # adding KPI -# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) -# # select KPI(s) -# response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request()) -# LOGGER.info("Response gRPC message object: {:}".format(response)) -# assert isinstance(response, KpiDescriptorList) - -# def test_set_list_of_KPIs(kpi_manager_client): -# LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") -# KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"] -# # adding KPI -# for kpi in KPIs_TO_SEARCH: -# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi)) - - -# ---------- 2nd Iteration Tests ----------------- -# def test_SetKpiDescriptor(kpi_manager_client): -# LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ") -# with open("kpi_manager/tests/KPI_configs.json", 'r') as file: -# data = json.load(file) -# _descriptors = data.get('KPIs', []) -# for _descritor_name in _descriptors: -# response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(_descritor_name)) -# LOGGER.info("Response gRPC message object: {:}".format(response)) -# assert isinstance(response, KpiId) - -# def test_GetKpiDescriptor(kpi_manager_client): -# LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ") -# response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request()) -# LOGGER.info("Response gRPC message object: {:}".format(response)) -# assert isinstance(response, KpiDescriptor) - -# def test_DeleteKpiDescriptor(kpi_manager_client): -# LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ") -# response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) -# del_response = kpi_manager_client.DeleteKpiDescriptor(response) -# kpi_manager_client.GetKpiDescriptor(response) -# LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response)) -# assert isinstance(del_response, Empty) - -# def test_SelectKpiDescriptor(kpi_manager_client): -# LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ") -# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a()) -# response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request_a()) -# LOGGER.info("Response gRPC message object: {:}".format(response)) -# assert isinstance(response, KpiDescriptorList) - -# ------------- INITIAL TESTs ---------------- -# Test case that makes use of client fixture to test server's CreateKpi method -# def test_set_kpi(kpi_manager_client): # pylint: disable=redefined-outer-name -# # make call to server -# LOGGER.warning('test_create_kpi requesting') -# for i in range(3): -# response = kpi_manager_client.SetKpiDescriptor(create_kpi_request(str(i+1))) -# LOGGER.debug(str(response)) -# assert isinstance(response, KpiId) - -# # Test case that makes use of client fixture to test server's DeleteKpi method -# def test_delete_kpi(kpi_manager_client): # pylint: disable=redefined-outer-name -# # make call to server -# LOGGER.warning('delete_kpi requesting') -# response = kpi_manager_client.SetKpiDescriptor(create_kpi_request('4')) -# response = kpi_manager_client.DeleteKpiDescriptor(response) -# LOGGER.debug(str(response)) -# assert isinstance(response, Empty) +def test_SelectKpiDescriptor(kpi_manager_client): + LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ") + # adding KPI + kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) + # select KPI(s) + response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request()) + LOGGER.info("Response gRPC message object: {:}".format(response)) + assert isinstance(response, KpiDescriptorList) -# # Test case that makes use of client fixture to test server's GetKpiDescriptor method -# def test_select_kpi_descriptor(kpi_manager_client): # pylint: disable=redefined-outer-name -# LOGGER.warning('test_selectkpidescritor begin') -# response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request()) -# LOGGER.debug(str(response)) -# assert isinstance(response, KpiDescriptorList) +def test_set_list_of_KPIs(kpi_manager_client): + LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") + KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"] + # adding KPI + for kpi in KPIs_TO_SEARCH: + kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi)) diff --git a/src/kpi_value_api/Dockerfile b/src/kpi_value_api/Dockerfile index 7dd8d307b8338c4a29e97c742ca12a49c4611e0a..25b8da931f88000dd229c536456a3eb1fa7f56db 100644 --- a/src/kpi_value_api/Dockerfile +++ b/src/kpi_value_api/Dockerfile @@ -63,6 +63,8 @@ RUN python3 -m pip install -r requirements.txt # Add component files into working directory WORKDIR /var/teraflow COPY src/kpi_value_api/. kpi_value_api/ +COPY src/kpi_manager/__init__.py kpi_manager/__init__.py +COPY src/kpi_manager/client/. kpi_manager/client/ # Start the service ENTRYPOINT ["python", "-m", "kpi_value_api.service"] diff --git a/src/kpi_value_api/README.md b/src/kpi_value_api/README.md new file mode 100644 index 0000000000000000000000000000000000000000..70ba2c5e79c79147e336307ecc6d5ddfc263df90 --- /dev/null +++ b/src/kpi_value_api/README.md @@ -0,0 +1,23 @@ +# How to locally run and test KPI Value API micro-service + +### Pre-requisets +Ensure the following requirements are met before executing the KPI Value API service. + +1. The KPI Manger service is running and Apache Kafka is running. + +2. A virtual enviornment exist with all the required packages listed in ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_api/requirements.in) file sucessfully installed. + +3. Call the ["create_all_topics()"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/common/tools/kafka/Variables.py) function to verify the existence of all required topics on kafka. + +### Messages format templates +The ["messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_api/tests/messages.py) python file contains templates for creating gRPC messages. + +### Unit test file +The ["KPI Value API test"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_api/tests/test_kpi_value_api.py) python file enlist various tests conducted to validate functionality. + +### Flow of execution (Kpi Maanager Service functions) +1. Call the `create_new_topic_if_not_exists()` method to create any new topics if needed. + +2. Call `StoreKpiValues(KpiValueList)` to produce `Kpi Value` on a Kafka Topic. (The `KpiValueWriter` microservice will consume and process the `Kpi Value`) + +3. Call `SelectKpiValues(KpiValueFilter) -> KpiValueList` to read metric from the Prometheus DB. diff --git a/src/kpi_value_api/requirements.in b/src/kpi_value_api/requirements.in index 7e4694109dc4e1d31b86abfc03162494faafcdaf..f5695906a8d02d55e15960a76986b8d03f02dba1 100644 --- a/src/kpi_value_api/requirements.in +++ b/src/kpi_value_api/requirements.in @@ -14,3 +14,4 @@ confluent-kafka==2.3.* requests==2.27.* +prometheus-api-client==0.5.3 \ No newline at end of file diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index d27de54f3cddfd0d70d656a89c45adc50e518289..5e7c3d139a9fd041d5d9fc6dd7032cb54bda17c4 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -12,22 +12,27 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, grpc, requests +import logging, grpc from typing import Tuple, Any -from datetime import datetime from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.proto.context_pb2 import Empty +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceServicer from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiValue, KpiValueType from confluent_kafka import Producer as KafkaProducer +from prometheus_api_client import PrometheusConnect +from prometheus_api_client.utils import parse_datetime + +from kpi_manager.client.KpiManagerClient import KpiManagerClient LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('KpiValueAPI', 'NBIgRPC') -PROM_URL = "http://localhost:9090" +PROM_URL = "http://prometheus-k8s.monitoring.svc.cluster.local:9090" # TODO: updated with the env variables class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): def __init__(self): @@ -38,7 +43,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): ) -> Empty: LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request)) producer_obj = KafkaProducer({ - 'bootstrap.servers' : KafkaConfig.SERVER_IP.value + 'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value }) for kpi_value in request.kpi_value_list: kpi_value_to_produce : Tuple [str, Any, Any] = ( @@ -63,40 +68,68 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): ) -> KpiValueList: LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request)) response = KpiValueList() - metrics = [kpi.kpi_id for kpi in request.kpi_id] - start_timestamps = [timestamp for timestamp in request.start_timestamp] - end_timestamps = [timestamp for timestamp in request.end_timestamp] - results = [] + + kpi_manager_client = KpiManagerClient() + prom_connect = PrometheusConnect(url=PROM_URL) - for start, end in zip(start_timestamps, end_timestamps): - start_str = datetime.fromtimestamp(start.seconds).isoformat() + "Z" - end_str = datetime.fromtimestamp(end.seconds).isoformat() + "Z" + metrics = [self.GetKpiSampleType(kpi, kpi_manager_client) for kpi in request.kpi_id] + start_timestamps = [parse_datetime(timestamp) for timestamp in request.start_timestamp] + end_timestamps = [parse_datetime(timestamp) for timestamp in request.end_timestamp] + prom_response = [] + for start_time, end_time in zip(start_timestamps, end_timestamps): for metric in metrics: - url = f'{PROM_URL}/api/v1/query_range' - params = { - 'query': metric, - 'start': start_str, - 'end' : end_str, - 'step' : '30s' # or any other step you need - } - response = requests.get(url, params=params) - if response.status_code == 200: - data = response.json() - for result in data['data']['result']: - for value in result['values']: - kpi_value = KpiValue( - kpi_id=metric, - timestamp=str(seconds=value[0]), - kpi_value_type=self._convert_value_to_kpi_value_type(value[1]) - ) - results.append(kpi_value) + print(start_time, end_time, metric) + LOGGER.debug(">>> Query: {:}".format(metric)) + prom_response.append( + prom_connect.custom_query_range( + query = metric, # this is the metric name and label config + start_time = start_time, + end_time = end_time, + step = 30, # or any other step value (missing in gRPC Filter request) + ) + ) + + for single_resposne in prom_response: + # print ("{:}".format(single_resposne)) + for record in single_resposne: + # print("Record >>> kpi: {:} >>> time & values set: {:}".format(record['metric']['__name__'], record['values'])) + for value in record['values']: + # print("{:} - {:}".format(record['metric']['__name__'], value)) + kpi_value = KpiValue() + kpi_value.kpi_id.kpi_id = record['metric']['__name__'], + kpi_value.timestamp = value[0], + kpi_value.kpi_value_type = self.ConverValueToKpiValueType(value[1]) + response.kpi_value_list.append(kpi_value) + return response + + def GetKpiSampleType(self, kpi_value: str, kpi_manager_client): + print("--- START -----") - def _convert_value_to_kpi_value_type(self, value): + kpi_id = KpiId() + kpi_id.kpi_id.uuid = kpi_value.kpi_id.kpi_id.uuid + # print("KpiId generated: {:}".format(kpi_id)) + + try: + kpi_descriptor_object = KpiDescriptor() + kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) + # TODO: why kpi_descriptor_object recevies a KpiDescriptor type object not Empty type object??? + if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid: + LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) + print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) + return KpiSampleType.Name(kpi_descriptor_object.kpi_sample_type) # extract and return the name of KpiSampleType + else: + LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) + print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) + except Exception as e: + LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e)) + print ("Unable to get KpiDescriptor. Error: {:}".format(e)) + + def ConverValueToKpiValueType(self, value): # Check if the value is an integer (int64) try: - int64_value = int(value) - return KpiValueType(int64Val=int64_value) + int_value = int(value) + return KpiValueType(int64Val=int_value) except ValueError: pass # Check if the value is a float @@ -112,7 +145,6 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): # If none of the above, treat it as a string return KpiValueType(stringVal=value) - def delivery_callback(self, err, msg): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) else: LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) diff --git a/src/kpi_value_api/tests/messages.py b/src/kpi_value_api/tests/messages.py index c2a1cbb0b275fb26d6498e4470f3869a105a8d36..d8ad14bd44eebc1e9412cfd5ff2973e6018c95e9 100644 --- a/src/kpi_value_api/tests/messages.py +++ b/src/kpi_value_api/tests/messages.py @@ -18,8 +18,9 @@ from common.proto.kpi_value_api_pb2 import KpiValue, KpiValueList def create_kpi_value_list(): _create_kpi_value_list = KpiValueList() - # To run this experiment sucessfully, already existing UUID in KPI DB in necessary. - # because the UUID is used to get the descriptor form KPI DB. + # To run this experiment sucessfully, add an existing UUID of a KPI Descriptor from the KPI DB. + # This UUID is used to get the descriptor form the KPI DB. If the Kpi ID does not exists, + # some part of the code won't execute. EXISTING_KPI_IDs = ["725ce3ad-ac67-4373-bd35-8cd9d6a86e09", str(uuid.uuid4()), str(uuid.uuid4())] diff --git a/src/kpi_value_writer/README.md b/src/kpi_value_writer/README.md index 72ba6e5594adeef4a29d650615716c26273ed115..c45a0e39534fae9efef4174d5ca5be7047845c48 100644 --- a/src/kpi_value_writer/README.md +++ b/src/kpi_value_writer/README.md @@ -1,29 +1,17 @@ -# How to locally run and test KPI manager micro-service +# How to locally run and test the KPI Value Writer micro-service -## --- File links need to be updated. --- ### Pre-requisets -The following requirements should be fulfilled before the execuation of KPI management service. +Ensure the following requirements are meet before executing the KPI Value Writer service> -1. Verify that [kpi_management.proto](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/proto/kpi_management.proto) file exists and grpcs file are generated sucessfully. -2. Virtual enviornment exist with all the required packages listed in ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/requirements.in) are installed sucessfully. -3. Verify the creation of required database and table. -[KPI DB test](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/database/tests/KpiDBtests.py) python file enlist the functions to create tables and database and -[KPI Engine](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/service/database/KpiEngine.py) contains the DB string, update the string as per your deployment. +1. The KPI Manger and KPI Value API services are running and Apache Kafka is running. -### Messages format templates -["Messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/tests/test_messages.py) python file enlist the basic gRPC messages format used during the testing. - -### Test file -["KPI management test"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/tests/test_kpi_manager.py) python file enlist different tests conducted during the experiment. - -### Flow of execution (Kpi Maanager Service functions) -1. Call the `create_database()` and `create_tables()` functions from `Kpi_DB` class to create the required database and table if they don't exist. Call `verify_tables` to verify the existence of KPI table. +2. A Virtual enviornment exist with all the required packages listed in the ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_writer/requirements.in) file installed sucessfully. -2. Call the gRPC method `SetKpiDescriptor(KpiDescriptor)->KpiId` to add the KpiDescriptor in `Kpi` DB. `KpiDescriptor` and `KpiId` are both pre-defined gRPC message types. - -3. Call `GetKpiDescriptor(KpiId)->KpiDescriptor` to read the `KpiDescriptor` from DB and `DeleteKpiDescriptor(KpiId)` to delete the `KpiDescriptor` from DB. +### Messages format templates +The ["messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_writer/tests/test_messages.py) python file contains the templates to create gRPC messages. -4. Call `SelectKpiDescriptor(KpiDescriptorFilter)->KpiDescriptorList` to get all `KpiDescriptor` objects that matches the filter criteria. `KpiDescriptorFilter` and `KpiDescriptorList` are pre-defined gRPC message types. +### Unit test file +The ["KPI Value API test"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_writer/tests/test_kpi_value_writer.py) python file enlist various tests conducted to validate functionality. -## For KPI composer and KPI writer -The functionalities of KPI composer and writer is heavily dependent upon Telemetery service. Therfore, these services has other pre-requsites that are mention [here](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/telemetry/requirements.in). \ No newline at end of file +### Flow of execution +1. Call the `RunKafkaConsumer` method from the `KpiValueWriter` class to start consuming the `KPI Value` generated by the `KPI Value API` or `Telemetry`. For every valid `KPI Value` consumer from Kafka, it invokes the `PrometheusWriter` class to prepare and push the metric to the Promethues DB. diff --git a/src/kpi_value_writer/service/KpiValueWriter.py b/src/kpi_value_writer/service/KpiValueWriter.py index 26bab44657606b1f3edc14659d128c5ccc7a6890..5e2b6babe210e4fb71eb8617432a6dfd5d164407 100644 --- a/src/kpi_value_writer/service/KpiValueWriter.py +++ b/src/kpi_value_writer/service/KpiValueWriter.py @@ -33,7 +33,6 @@ from .MetricWriterToPrometheus import MetricWriterToPrometheus LOGGER = logging.getLogger(__name__) ACTIVE_CONSUMERS = [] -METRIC_WRITER = MetricWriterToPrometheus() class KpiValueWriter(GenericGrpcService): def __init__(self, cls_name : str = __name__) -> None: @@ -48,12 +47,14 @@ class KpiValueWriter(GenericGrpcService): @staticmethod def KafkaConsumer(): + kpi_manager_client = KpiManagerClient() + metric_writer = MetricWriterToPrometheus() + kafka_consumer = KafkaConsumer( - { 'bootstrap.servers' : KafkaConfig.SERVER_IP.value, + { 'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value, 'group.id' : __class__, 'auto.offset.reset' : 'latest'} ) - kpi_manager_client = KpiManagerClient() kafka_consumer.subscribe([KafkaTopic.VALUE.value]) LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) @@ -72,13 +73,13 @@ class KpiValueWriter(GenericGrpcService): kpi_value.ParseFromString(raw_kpi.value()) LOGGER.info("Received KPI : {:}".format(kpi_value)) print("Received KPI : {:}".format(kpi_value)) - KpiValueWriter.get_kpi_descriptor(kpi_value, kpi_manager_client) + KpiValueWriter.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer) except Exception as e: print("Error detail: {:}".format(e)) continue @staticmethod - def get_kpi_descriptor(kpi_value: str, kpi_manager_client ): + def get_kpi_descriptor(kpi_value: str, kpi_manager_client, metric_writer): print("--- START -----") kpi_id = KpiId() @@ -89,12 +90,11 @@ class KpiValueWriter(GenericGrpcService): try: kpi_descriptor_object = KpiDescriptor() kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) + # TODO: why kpi_descriptor_object recevies a KpiDescriptor type object not Empty type object??? if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid: - # print("kpi descriptor received: {:}".format(kpi_descriptor_object)) - # if isinstance (kpi_descriptor_object, KpiDescriptor): LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) - METRIC_WRITER.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value) + metric_writer.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value) else: LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) diff --git a/src/kpi_value_writer/service/MetricWriterToPrometheus.py b/src/kpi_value_writer/service/MetricWriterToPrometheus.py index b681164786bd310d457998bae55b836522888b94..f1d07978303dd8ac635978fe4b3bc286a746ce88 100644 --- a/src/kpi_value_writer/service/MetricWriterToPrometheus.py +++ b/src/kpi_value_writer/service/MetricWriterToPrometheus.py @@ -63,8 +63,8 @@ class MetricWriterToPrometheus: def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue): # merge both gRPC messages into single varible. cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value) - tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} # extracted values will be used as metric tag - metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] + tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} + metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] # These values will be used as metric tags metric_name = cooked_kpi['kpi_sample_type'] try: if metric_name not in PROM_METRICS: # Only register the metric, when it doesn't exists @@ -93,4 +93,5 @@ class MetricWriterToPrometheus: print("Metric {:} is already registered. Skipping.".format(metric_name)) else: LOGGER.error("Error while pushing metric: {}".format(e)) - raise \ No newline at end of file + raise + diff --git a/src/kpi_value_writer/tests/test_kpi_value_writer.py b/src/kpi_value_writer/tests/test_kpi_value_writer.py index 572495d48d70cdc40c0ef6bb1efcf877e2a610ee..fce043d7fd6c9b5cbb9374d0b059cb1e2fa65a24 100755 --- a/src/kpi_value_writer/tests/test_kpi_value_writer.py +++ b/src/kpi_value_writer/tests/test_kpi_value_writer.py @@ -14,31 +14,12 @@ import logging from kpi_value_writer.service.KpiValueWriter import KpiValueWriter + from common.tools.kafka.Variables import KafkaTopic -from kpi_manager.client.KpiManagerClient import KpiManagerClient -from kpi_manager.tests.test_messages import create_kpi_descriptor_request -from common.proto.kpi_manager_pb2 import KpiDescriptor -from kpi_value_writer.tests.test_messages import create_kpi_id_request -LOGGER = logging.getLogger(__name__) -# def test_GetKpiDescriptor(): -# LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ") -# kpi_manager_client = KpiManagerClient() -# # adding KPI -# LOGGER.info(" --->>> calling SetKpiDescriptor ") -# response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) -# # get KPI -# LOGGER.info(" --->>> calling GetKpiDescriptor with response ID") -# response = kpi_manager_client.GetKpiDescriptor(response_id) -# LOGGER.info("Response gRPC message object: {:}".format(response)) - -# LOGGER.info(" --->>> calling GetKpiDescriptor with random ID") -# rand_response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request()) -# LOGGER.info("Response gRPC message object: {:}".format(rand_response)) -# LOGGER.info("\n------------------ TEST FINISHED ---------------------\n") -# assert isinstance(response, KpiDescriptor) +LOGGER = logging.getLogger(__name__) # -------- Initial Test ---------------- def test_validate_kafka_topics(): @@ -49,4 +30,3 @@ def test_validate_kafka_topics(): def test_KafkaConsumer(): LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ") KpiValueWriter.RunKafkaConsumer() -