From 56ebd5dc725502862611b66e06f5fe33f3d57a20 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Tue, 30 Jul 2024 09:45:49 +0000 Subject: [PATCH 01/14] Foreced changes in KpiValueWriter to handle gRPC empty return message. --- src/kpi_value_writer/service/KpiValueWriter.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/kpi_value_writer/service/KpiValueWriter.py b/src/kpi_value_writer/service/KpiValueWriter.py index 26bab4465..022126fd0 100644 --- a/src/kpi_value_writer/service/KpiValueWriter.py +++ b/src/kpi_value_writer/service/KpiValueWriter.py @@ -33,7 +33,6 @@ from .MetricWriterToPrometheus import MetricWriterToPrometheus LOGGER = logging.getLogger(__name__) ACTIVE_CONSUMERS = [] -METRIC_WRITER = MetricWriterToPrometheus() class KpiValueWriter(GenericGrpcService): def __init__(self, cls_name : str = __name__) -> None: @@ -48,12 +47,14 @@ class KpiValueWriter(GenericGrpcService): @staticmethod def KafkaConsumer(): + kpi_manager_client = KpiManagerClient() + metric_writer = MetricWriterToPrometheus() + kafka_consumer = KafkaConsumer( { 'bootstrap.servers' : KafkaConfig.SERVER_IP.value, 'group.id' : __class__, 'auto.offset.reset' : 'latest'} - ) - kpi_manager_client = KpiManagerClient() + ) kafka_consumer.subscribe([KafkaTopic.VALUE.value]) LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) @@ -72,13 +73,13 @@ class KpiValueWriter(GenericGrpcService): kpi_value.ParseFromString(raw_kpi.value()) LOGGER.info("Received KPI : {:}".format(kpi_value)) print("Received KPI : {:}".format(kpi_value)) - KpiValueWriter.get_kpi_descriptor(kpi_value, kpi_manager_client) + KpiValueWriter.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer) except Exception as e: print("Error detail: {:}".format(e)) continue @staticmethod - def get_kpi_descriptor(kpi_value: str, kpi_manager_client ): + def get_kpi_descriptor(kpi_value: str, kpi_manager_client, metric_writer): print("--- START -----") kpi_id = KpiId() @@ -89,12 +90,11 @@ class KpiValueWriter(GenericGrpcService): try: kpi_descriptor_object = KpiDescriptor() kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) + # TODO: why kpi_descriptor_object recevies a KpiDescriptor type object not Empty type object??? if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid: - # print("kpi descriptor received: {:}".format(kpi_descriptor_object)) - # if isinstance (kpi_descriptor_object, KpiDescriptor): LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) - METRIC_WRITER.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value) + metric_writer.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value) else: LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) -- GitLab From 13ab01f8581f3bd7fcbaeb65c2867a6a1cc675e8 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Tue, 30 Jul 2024 09:55:51 +0000 Subject: [PATCH 02/14] updated imports to resolve error generated by unit test. - Imports are updated in test_kpi_value_writer.py --- src/kpi_value_writer/tests/test_kpi_value_writer.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/kpi_value_writer/tests/test_kpi_value_writer.py b/src/kpi_value_writer/tests/test_kpi_value_writer.py index 572495d48..40593af97 100755 --- a/src/kpi_value_writer/tests/test_kpi_value_writer.py +++ b/src/kpi_value_writer/tests/test_kpi_value_writer.py @@ -14,11 +14,12 @@ import logging from kpi_value_writer.service.KpiValueWriter import KpiValueWriter +from kpi_value_writer.tests.test_messages import create_kpi_id_request, create_kpi_descriptor_request + from common.tools.kafka.Variables import KafkaTopic -from kpi_manager.client.KpiManagerClient import KpiManagerClient -from kpi_manager.tests.test_messages import create_kpi_descriptor_request from common.proto.kpi_manager_pb2 import KpiDescriptor -from kpi_value_writer.tests.test_messages import create_kpi_id_request +from kpi_manager.client.KpiManagerClient import KpiManagerClient + LOGGER = logging.getLogger(__name__) -- GitLab From ecea8dac36976bb7c7ce0ec656637e1edf5fc66e Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Tue, 30 Jul 2024 14:14:18 +0000 Subject: [PATCH 03/14] Kafka deployment script is integrated with TFS deplyment script. - Kafka env variables are created in my_deply.sh, kafka.sh and all.sh --- deploy/all.sh | 3 ++ deploy/kafka.sh | 83 ++++++++++++++++++++++++++++++------------------- deploy/tfs.sh | 20 +++++++++++- 3 files changed, 73 insertions(+), 33 deletions(-) diff --git a/deploy/all.sh b/deploy/all.sh index f93cd92ac..e9b33b469 100755 --- a/deploy/all.sh +++ b/deploy/all.sh @@ -215,6 +215,9 @@ export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"} # Deploy QuestDB ./deploy/qdb.sh +# Deploy Apache Kafka +./deploy/kafka.sh + # Expose Dashboard ./deploy/expose_dashboard.sh diff --git a/deploy/kafka.sh b/deploy/kafka.sh index 4a91bfc9e..b2f2f1f9e 100755 --- a/deploy/kafka.sh +++ b/deploy/kafka.sh @@ -20,50 +20,69 @@ # If not already set, set the namespace where Apache Kafka will be deployed. export KFK_NAMESPACE=${KFK_NAMESPACE:-"kafka"} +# If not already set, set the port Apache Kafka server will be exposed to. +export KFK_SERVER_PORT=${KFK_SERVER_PORT:-"9092"} + +# If not already set, if flag is YES, Apache Kafka will be redeployed and all topics will be lost. +export KFK_REDEPLOY=${KFK_REDEPLOY:-""} + ######################################################################################################################## # Automated steps start here ######################################################################################################################## -# Constants -TMP_FOLDER="./tmp" -KFK_MANIFESTS_PATH="manifests/kafka" -KFK_ZOOKEEPER_MANIFEST="01-zookeeper.yaml" -KFK_MANIFEST="02-kafka.yaml" + # Constants + TMP_FOLDER="./tmp" + KFK_MANIFESTS_PATH="manifests/kafka" + KFK_ZOOKEEPER_MANIFEST="01-zookeeper.yaml" + KFK_MANIFEST="02-kafka.yaml" + + # Create a tmp folder for files modified during the deployment + TMP_MANIFESTS_FOLDER="${TMP_FOLDER}/${KFK_NAMESPACE}/manifests" + mkdir -p ${TMP_MANIFESTS_FOLDER} -# Create a tmp folder for files modified during the deployment -TMP_MANIFESTS_FOLDER="${TMP_FOLDER}/${KFK_NAMESPACE}/manifests" -mkdir -p ${TMP_MANIFESTS_FOLDER} +function kafka_deploy() { + # copy zookeeper and kafka manifest files to temporary manifest location + cp "${KFK_MANIFESTS_PATH}/${KFK_ZOOKEEPER_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}" + cp "${KFK_MANIFESTS_PATH}/${KFK_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_MANIFEST}" -# copy zookeeper and kafka manifest files to temporary manifest location -cp "${KFK_MANIFESTS_PATH}/${KFK_ZOOKEEPER_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}" -cp "${KFK_MANIFESTS_PATH}/${KFK_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_MANIFEST}" + # echo "Apache Kafka Namespace" + echo ">>> Delete Apache Kafka Namespace" + kubectl delete namespace ${KFK_NAMESPACE} --ignore-not-found -echo "Apache Kafka Namespace" -echo ">>> Delete Apache Kafka Namespace" -kubectl delete namespace ${KFK_NAMESPACE} --ignore-not-found + echo ">>> Create Apache Kafka Namespace" + kubectl create namespace ${KFK_NAMESPACE} -echo ">>> Create Apache Kafka Namespace" -kubectl create namespace ${KFK_NAMESPACE} + # echo ">>> Deplying Apache Kafka Zookeeper" + # Kafka zookeeper service should be deployed before the kafka service + kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}" -echo ">>> Deplying Apache Kafka Zookeeper" -# Kafka zookeeper service should be deployed before the kafka service -kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}" + KFK_ZOOKEEPER_SERVICE="zookeeper-service" # this command may be replaced with command to extract service name automatically + KFK_ZOOKEEPER_IP=$(kubectl --namespace ${KFK_NAMESPACE} get service ${KFK_ZOOKEEPER_SERVICE} -o 'jsonpath={.spec.clusterIP}') -KFK_ZOOKEEPER_SERVICE="zookeeper-service" # this command may be replaced with command to extract service name automatically -KFK_ZOOKEEPER_IP=$(kubectl --namespace ${KFK_NAMESPACE} get service ${KFK_ZOOKEEPER_SERVICE} -o 'jsonpath={.spec.clusterIP}') + # Kafka service should be deployed after the zookeeper service + sed -i "s//${KFK_ZOOKEEPER_IP}/" "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST" -# Kafka service should be deployed after the zookeeper service -sed -i "s//${KFK_ZOOKEEPER_IP}/" "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST" + # echo ">>> Deploying Apache Kafka Broker" + kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST" -echo ">>> Deploying Apache Kafka Broker" -kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST" + # echo ">>> Verifing Apache Kafka deployment" + sleep 5 + # KFK_PODS_STATUS=$(kubectl --namespace ${KFK_NAMESPACE} get pods) + # if echo "$KFK_PODS_STATUS" | grep -qEv 'STATUS|Running'; then + # echo "Deployment Error: \n $KFK_PODS_STATUS" + # else + # echo "$KFK_PODS_STATUS" + # fi +} -echo ">>> Verifing Apache Kafka deployment" -sleep 10 -KFK_PODS_STATUS=$(kubectl --namespace ${KFK_NAMESPACE} get pods) -if echo "$KFK_PODS_STATUS" | grep -qEv 'STATUS|Running'; then - echo "Deployment Error: \n $KFK_PODS_STATUS" +echo "Apache Kafka" +echo ">>> Checking if Apache Kafka is deployed ... " +if [ "$KFK_REDEPLOY" == "YES" ]; then + kafka_deploy +elif kubectl get --namespace ${KFK_NAMESPACE} deployments.apps &> /dev/null; then + echo ">>> Apache Kafka already present; skipping step..." else - echo "$KFK_PODS_STATUS" -fi \ No newline at end of file + kafka_deploy +fi +echo diff --git a/deploy/tfs.sh b/deploy/tfs.sh index 62f36a2c1..d92d789e3 100755 --- a/deploy/tfs.sh +++ b/deploy/tfs.sh @@ -115,6 +115,17 @@ export PROM_EXT_PORT_HTTP=${PROM_EXT_PORT_HTTP:-"9090"} export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"} +# ----- Apache Kafka ------------------------------------------------------ + +# If not already set, set the namespace where Apache Kafka will be deployed. +export KFK_NAMESPACE=${KFK_NAMESPACE:-"kafka"} + +# If not already set, set the port Apache Kafka server will be exposed to. +export KFK_SERVER_PORT=${KFK_SERVER_PORT:-"9092"} + +# If not already set, if flag is YES, Apache Kafka will be redeployed and topic will be lost. +export KFK_REDEPLOY=${KFK_REDEPLOY:-""} + ######################################################################################################################## # Automated steps start here ######################################################################################################################## @@ -147,7 +158,7 @@ kubectl create secret generic crdb-data --namespace ${TFS_K8S_NAMESPACE} --type= --from-literal=CRDB_SSLMODE=require printf "\n" -echo "Create secret with CockroachDB data for KPI Management" +echo "Create secret with CockroachDB data for KPI Management microservices" CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}') CRDB_DATABASE_KPI_MGMT="tfs_kpi_mgmt" # TODO: change by specific configurable environment variable kubectl create secret generic crdb-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ @@ -159,6 +170,13 @@ kubectl create secret generic crdb-kpi-data --namespace ${TFS_K8S_NAMESPACE} --t --from-literal=CRDB_SSLMODE=require printf "\n" +echo "Create secret with Apache Kafka kfk-kpi-data for KPI and Telemetry microservices" +KFK_SERVER_PORT=$(kubectl --namespace ${KFK_NAMESPACE} get service kafka-service -o 'jsonpath={.spec.ports[0].port}') +kubectl create secret generic kfk-kpi-data --namespace ${KFK_NAMESPACE} --type='Opaque' \ + --from-literal=KFK_NAMESPACE=${KFK_NAMESPACE} \ + --from-literal=KFK_SERVER_PORT=${KFK_NAMESPACE} +printf "\n" + echo "Create secret with NATS data" NATS_CLIENT_PORT=$(kubectl --namespace ${NATS_NAMESPACE} get service ${NATS_NAMESPACE} -o 'jsonpath={.spec.ports[?(@.name=="client")].port}') if [ -z "$NATS_CLIENT_PORT" ]; then -- GitLab From a866c39b4b7959dcd0cca03415901653c2994c3c Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Tue, 30 Jul 2024 14:43:57 +0000 Subject: [PATCH 04/14] Kafka secret added to kpi_value_api/kpi_value_writer - improvements to accuratly read the env variables --- deploy/kafka.sh | 2 +- deploy/tfs.sh | 4 ++-- manifests/kpi_value_apiservice.yaml | 3 +++ manifests/kpi_value_writerservice.yaml | 3 +++ 4 files changed, 9 insertions(+), 3 deletions(-) diff --git a/deploy/kafka.sh b/deploy/kafka.sh index b2f2f1f9e..21ba89408 100755 --- a/deploy/kafka.sh +++ b/deploy/kafka.sh @@ -81,7 +81,7 @@ echo ">>> Checking if Apache Kafka is deployed ... " if [ "$KFK_REDEPLOY" == "YES" ]; then kafka_deploy elif kubectl get --namespace ${KFK_NAMESPACE} deployments.apps &> /dev/null; then - echo ">>> Apache Kafka already present; skipping step..." + echo ">>> Apache Kafka already present; skipping step." else kafka_deploy fi diff --git a/deploy/tfs.sh b/deploy/tfs.sh index d92d789e3..4ecfaae99 100755 --- a/deploy/tfs.sh +++ b/deploy/tfs.sh @@ -172,9 +172,9 @@ printf "\n" echo "Create secret with Apache Kafka kfk-kpi-data for KPI and Telemetry microservices" KFK_SERVER_PORT=$(kubectl --namespace ${KFK_NAMESPACE} get service kafka-service -o 'jsonpath={.spec.ports[0].port}') -kubectl create secret generic kfk-kpi-data --namespace ${KFK_NAMESPACE} --type='Opaque' \ +kubectl create secret generic kfk-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ --from-literal=KFK_NAMESPACE=${KFK_NAMESPACE} \ - --from-literal=KFK_SERVER_PORT=${KFK_NAMESPACE} + --from-literal=KFK_SERVER_PORT=${KFK_SERVER_PORT} printf "\n" echo "Create secret with NATS data" diff --git a/manifests/kpi_value_apiservice.yaml b/manifests/kpi_value_apiservice.yaml index 74eb90f67..e4dcb0054 100644 --- a/manifests/kpi_value_apiservice.yaml +++ b/manifests/kpi_value_apiservice.yaml @@ -39,6 +39,9 @@ spec: env: - name: LOG_LEVEL value: "INFO" + envFrom: + - secretRef: + name: kfk-kpi-data readinessProbe: exec: command: ["/bin/grpc_health_probe", "-addr=:30020"] diff --git a/manifests/kpi_value_writerservice.yaml b/manifests/kpi_value_writerservice.yaml index 8a8e44ec2..e21e36f48 100644 --- a/manifests/kpi_value_writerservice.yaml +++ b/manifests/kpi_value_writerservice.yaml @@ -39,6 +39,9 @@ spec: env: - name: LOG_LEVEL value: "INFO" + envFrom: + - secretRef: + name: kfk-kpi-data readinessProbe: exec: command: ["/bin/grpc_health_probe", "-addr=:30030"] -- GitLab From 45f1addf94e5a933728021f3051894f4d95b2467 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Tue, 30 Jul 2024 14:49:57 +0000 Subject: [PATCH 05/14] dynamically creates the kafka server address with env variables. - get the values of KAFKA_NAMESPACE and KFK_SERVER_PORT to create KAFKA server address. --- src/common/tools/kafka/Variables.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index 24ae2cff7..89ac42f90 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -16,14 +16,18 @@ import logging from enum import Enum from confluent_kafka import KafkaException from confluent_kafka.admin import AdminClient, NewTopic +from common.Settings import get_setting LOGGER = logging.getLogger(__name__) +KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}' class KafkaConfig(Enum): + KFK_NAMESPACE = get_setting('KFK_NAMESPACE') + KFK_PORT = get_setting('KFK_SERVER_PORT') # SERVER_IP = "127.0.0.1:9092" - SERVER_IP = "kafka-service.kafka.svc.cluster.local:9092" - ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_IP}) + server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) + ADMIN_CLIENT = AdminClient({'bootstrap.servers': server_address}) class KafkaTopic(Enum): REQUEST = 'topic_request' -- GitLab From 58408ee6479c01a920703c5e9fdec154f5a4a236 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Tue, 30 Jul 2024 15:14:40 +0000 Subject: [PATCH 06/14] Some improvements to Kpi Manager test and messages file. - comment is added in Kpi DB file for future reference. --- src/kpi_manager/database/Kpi_DB.py | 3 +- src/kpi_manager/tests/test_kpi_db.py | 4 +- src/kpi_manager/tests/test_kpi_manager.py | 148 +++++----------------- 3 files changed, 34 insertions(+), 121 deletions(-) diff --git a/src/kpi_manager/database/Kpi_DB.py b/src/kpi_manager/database/Kpi_DB.py index 4b6064070..530abe457 100644 --- a/src/kpi_manager/database/Kpi_DB.py +++ b/src/kpi_manager/database/Kpi_DB.py @@ -34,14 +34,15 @@ class KpiDB: def create_database(self) -> None: if not sqlalchemy_utils.database_exists(self.db_engine.url): - LOGGER.debug("Database created. {:}".format(self.db_engine.url)) sqlalchemy_utils.create_database(self.db_engine.url) + LOGGER.debug("Database created. {:}".format(self.db_engine.url)) def drop_database(self) -> None: if sqlalchemy_utils.database_exists(self.db_engine.url): sqlalchemy_utils.drop_database(self.db_engine.url) def create_tables(self): + # TODO: use "get_tables(declatrative class obj)" method of "sqlalchemy_utils" to verify tables. try: KpiModel.metadata.create_all(self.db_engine) # type: ignore LOGGER.debug("Tables created in the DB Name: {:}".format(self.db_name)) diff --git a/src/kpi_manager/tests/test_kpi_db.py b/src/kpi_manager/tests/test_kpi_db.py index e961c12ba..d4a57f836 100644 --- a/src/kpi_manager/tests/test_kpi_db.py +++ b/src/kpi_manager/tests/test_kpi_db.py @@ -21,8 +21,8 @@ LOGGER = logging.getLogger(__name__) def test_verify_databases_and_Tables(): LOGGER.info('>>> test_verify_Tables : START <<< ') kpiDBobj = KpiDB() - kpiDBobj.drop_database() - kpiDBobj.verify_tables() + # kpiDBobj.drop_database() + # kpiDBobj.verify_tables() kpiDBobj.create_database() kpiDBobj.create_tables() kpiDBobj.verify_tables() diff --git a/src/kpi_manager/tests/test_kpi_manager.py b/src/kpi_manager/tests/test_kpi_manager.py index f0d9526d3..da149e3fe 100755 --- a/src/kpi_manager/tests/test_kpi_manager.py +++ b/src/kpi_manager/tests/test_kpi_manager.py @@ -17,7 +17,7 @@ import os, pytest import logging from typing import Union -#from common.proto.context_pb2 import Empty +from common.proto.context_pb2 import Empty from common.Constants import ServiceNameEnum from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc) @@ -26,12 +26,6 @@ from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList from common.tools.service.GenericGrpcService import GenericGrpcService -#from context.client.ContextClient import ContextClient - -# from device.service.driver_api.DriverFactory import DriverFactory -# from device.service.driver_api.DriverInstanceCache import DriverInstanceCache -# from device.service.DeviceService import DeviceService -# from device.client.DeviceClient import DeviceClient from kpi_manager.tests.test_messages import create_kpi_descriptor_request, create_kpi_filter_request, create_kpi_descriptor_request_a from kpi_manager.service.KpiManagerService import KpiManagerService @@ -39,12 +33,6 @@ from kpi_manager.client.KpiManagerClient import KpiManagerClient from kpi_manager.tests.test_messages import create_kpi_descriptor_request from kpi_manager.tests.test_messages import create_kpi_id_request - -#from monitoring.service.NameMapping import NameMapping - -#os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE' -#from device.service.drivers import DRIVERS - ########################### # Tests Setup ########################### @@ -55,8 +43,6 @@ KPIMANAGER_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIMANAGER) # t os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(KPIMANAGER_SERVICE_PORT) -# METRICSDB_HOSTNAME = os.environ.get('METRICSDB_HOSTNAME'){} - LOGGER = logging.getLogger(__name__) class MockContextService(GenericGrpcService): @@ -70,84 +56,10 @@ class MockContextService(GenericGrpcService): self.context_servicer = MockServicerImpl_Context() add_ContextServiceServicer_to_server(self.context_servicer, self.server) -# @pytest.fixture(scope='session') -# def context_service(): -# LOGGER.info('Initializing MockContextService...') -# _service = MockContextService(MOCKSERVICE_PORT) -# _service.start() - -# LOGGER.info('Yielding MockContextService...') -# yield _service - -# LOGGER.info('Terminating MockContextService...') -# _service.context_servicer.msg_broker.terminate() -# _service.stop() - -# LOGGER.info('Terminated MockContextService...') - -# @pytest.fixture(scope='session') -# def context_client(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument -# LOGGER.info('Initializing ContextClient...') -# _client = ContextClient() - -# LOGGER.info('Yielding ContextClient...') -# yield _client - -# LOGGER.info('Closing ContextClient...') -# _client.close() - -# LOGGER.info('Closed ContextClient...') - -# @pytest.fixture(scope='session') -# def device_service(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument -# LOGGER.info('Initializing DeviceService...') -# driver_factory = DriverFactory(DRIVERS) -# driver_instance_cache = DriverInstanceCache(driver_factory) -# _service = DeviceService(driver_instance_cache) -# _service.start() - -# # yield the server, when test finishes, execution will resume to stop it -# LOGGER.info('Yielding DeviceService...') -# yield _service - -# LOGGER.info('Terminating DeviceService...') -# _service.stop() - -# LOGGER.info('Terminated DeviceService...') - -# @pytest.fixture(scope='session') -# def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument -# LOGGER.info('Initializing DeviceClient...') -# _client = DeviceClient() - -# LOGGER.info('Yielding DeviceClient...') -# yield _client - -# LOGGER.info('Closing DeviceClient...') -# _client.close() - -# LOGGER.info('Closed DeviceClient...') - -# @pytest.fixture(scope='session') -# def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument -# LOGGER.info('Initializing DeviceClient...') -# _client = DeviceClient() - -# LOGGER.info('Yielding DeviceClient...') -# yield _client - -# LOGGER.info('Closing DeviceClient...') -# _client.close() - -# LOGGER.info('Closed DeviceClient...') - # This fixture will be requested by test cases and last during testing session @pytest.fixture(scope='session') def kpi_manager_service(): LOGGER.info('Initializing KpiManagerService...') - #name_mapping = NameMapping() - # _service = MonitoringService(name_mapping) - # _service = KpiManagerService(name_mapping) _service = KpiManagerService() _service.start() @@ -194,22 +106,22 @@ def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disab ########################### # ---------- 3rd Iteration Tests ---------------- -# def test_SetKpiDescriptor(kpi_manager_client): -# LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ") -# response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) -# LOGGER.info("Response gRPC message object: {:}".format(response)) -# assert isinstance(response, KpiId) +def test_SetKpiDescriptor(kpi_manager_client): + LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ") + response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) + LOGGER.info("Response gRPC message object: {:}".format(response)) + assert isinstance(response, KpiId) -# def test_DeleteKpiDescriptor(kpi_manager_client): -# LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ") -# # adding KPI -# response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) -# # deleting KPI -# del_response = kpi_manager_client.DeleteKpiDescriptor(response_id) -# # select KPI -# kpi_manager_client.GetKpiDescriptor(response_id) -# LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response)) -# assert isinstance(del_response, Empty) +def test_DeleteKpiDescriptor(kpi_manager_client): + LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ") + # adding KPI + response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) + # deleting KPI + del_response = kpi_manager_client.DeleteKpiDescriptor(response_id) + # select KPI + kpi_manager_client.GetKpiDescriptor(response_id) + LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response)) + assert isinstance(del_response, Empty) def test_GetKpiDescriptor(kpi_manager_client): LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ") @@ -225,21 +137,21 @@ def test_GetKpiDescriptor(kpi_manager_client): assert isinstance(response, KpiDescriptor) -# def test_SelectKpiDescriptor(kpi_manager_client): -# LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ") -# # adding KPI -# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) -# # select KPI(s) -# response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request()) -# LOGGER.info("Response gRPC message object: {:}".format(response)) -# assert isinstance(response, KpiDescriptorList) +def test_SelectKpiDescriptor(kpi_manager_client): + LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ") + # adding KPI + kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) + # select KPI(s) + response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request()) + LOGGER.info("Response gRPC message object: {:}".format(response)) + assert isinstance(response, KpiDescriptorList) -# def test_set_list_of_KPIs(kpi_manager_client): -# LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") -# KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"] -# # adding KPI -# for kpi in KPIs_TO_SEARCH: -# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi)) +def test_set_list_of_KPIs(kpi_manager_client): + LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") + KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"] + # adding KPI + for kpi in KPIs_TO_SEARCH: + kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi)) # ---------- 2nd Iteration Tests ----------------- -- GitLab From bdcb5c01852b63399adffed7b635b19119cc73c1 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Wed, 31 Jul 2024 12:30:39 +0000 Subject: [PATCH 07/14] Changes to resolve Kafka server error - KFK_SERVER_ADDRESS_TEMPLATE now defined inside the class KafkaConfig. - variable renamed to "SERVER_ADDRESS" from "server_address" --- src/common/tools/kafka/Variables.py | 13 +++++++------ .../service/KpiValueApiServiceServicerImpl.py | 2 +- src/kpi_value_writer/service/KpiValueWriter.py | 4 ++-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index 89ac42f90..9abc32b3e 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -20,14 +20,14 @@ from common.Settings import get_setting LOGGER = logging.getLogger(__name__) -KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}' class KafkaConfig(Enum): - KFK_NAMESPACE = get_setting('KFK_NAMESPACE') - KFK_PORT = get_setting('KFK_SERVER_PORT') - # SERVER_IP = "127.0.0.1:9092" - server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) - ADMIN_CLIENT = AdminClient({'bootstrap.servers': server_address}) + KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}' + KFK_NAMESPACE = get_setting('KFK_NAMESPACE') + KFK_PORT = get_setting('KFK_SERVER_PORT') + # SERVER_ADDRESS = "127.0.0.1:9092" + SERVER_ADDRESS = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) + ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS }) class KafkaTopic(Enum): REQUEST = 'topic_request' @@ -42,6 +42,7 @@ class KafkaTopic(Enum): Method to create Kafka topics defined as class members """ all_topics = [member.value for member in KafkaTopic] + LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.SERVER_ADDRESS.value)) if( KafkaTopic.create_new_topic_if_not_exists( all_topics )): LOGGER.debug("All topics are created sucsessfully") return True diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index d27de54f3..1559457d7 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -38,7 +38,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): ) -> Empty: LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request)) producer_obj = KafkaProducer({ - 'bootstrap.servers' : KafkaConfig.SERVER_IP.value + 'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value }) for kpi_value in request.kpi_value_list: kpi_value_to_produce : Tuple [str, Any, Any] = ( diff --git a/src/kpi_value_writer/service/KpiValueWriter.py b/src/kpi_value_writer/service/KpiValueWriter.py index 022126fd0..5e2b6babe 100644 --- a/src/kpi_value_writer/service/KpiValueWriter.py +++ b/src/kpi_value_writer/service/KpiValueWriter.py @@ -51,10 +51,10 @@ class KpiValueWriter(GenericGrpcService): metric_writer = MetricWriterToPrometheus() kafka_consumer = KafkaConsumer( - { 'bootstrap.servers' : KafkaConfig.SERVER_IP.value, + { 'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value, 'group.id' : __class__, 'auto.offset.reset' : 'latest'} - ) + ) kafka_consumer.subscribe([KafkaTopic.VALUE.value]) LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) -- GitLab From 706777124ea90accd341480bb18b761c89473f17 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Thu, 1 Aug 2024 12:53:52 +0000 Subject: [PATCH 08/14] changes to manage Kafka enviornment variable efficiently - KFK_SERVER_PORT and KFK_REDOPLY added into my_deploy.sh file. - refines kafka env variables import --- my_deploy.sh | 5 +++++ src/common/tools/kafka/Variables.py | 8 ++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/my_deploy.sh b/my_deploy.sh index b89df7481..45e0d1301 100755 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -181,3 +181,8 @@ export GRAF_EXT_PORT_HTTP="3000" # Set the namespace where Apache Kafka will be deployed. export KFK_NAMESPACE="kafka" +# Set the port Apache Kafka server will be exposed to. +export KFK_SERVER_PORT="9092" + +# Set the flag to YES for redeploying of Apache Kafka +export KFK_REDEPLOY="" diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index 9abc32b3e..e3ee2016a 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -23,11 +23,11 @@ LOGGER = logging.getLogger(__name__) class KafkaConfig(Enum): KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}' - KFK_NAMESPACE = get_setting('KFK_NAMESPACE') - KFK_PORT = get_setting('KFK_SERVER_PORT') - # SERVER_ADDRESS = "127.0.0.1:9092" + KFK_NAMESPACE = get_setting('KFK_NAMESPACE') + KFK_PORT = get_setting('KFK_SERVER_PORT') + # SERVER_ADDRESS = "127.0.0.1:9092" SERVER_ADDRESS = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) - ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS }) + ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS }) class KafkaTopic(Enum): REQUEST = 'topic_request' -- GitLab From 494b4cd712b559ce0d04225379d309dca07e2f07 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Fri, 2 Aug 2024 09:41:59 +0000 Subject: [PATCH 09/14] Improvement in SelectKpiValues method. - Added "GetKpiSampleType" method to extract KpiSampleType based on KpiId. - Added PromtheusConnect method to query Prometheus from prometheus_api_client library. - KpiManagerClient added in DockerFile - prometheus_api_client added in requirement file. --- src/kpi_value_api/Dockerfile | 2 + src/kpi_value_api/requirements.in | 1 + .../service/KpiValueApiServiceServicerImpl.py | 93 ++++++++++++------- .../service/MetricWriterToPrometheus.py | 2 +- 4 files changed, 66 insertions(+), 32 deletions(-) diff --git a/src/kpi_value_api/Dockerfile b/src/kpi_value_api/Dockerfile index 7dd8d307b..25b8da931 100644 --- a/src/kpi_value_api/Dockerfile +++ b/src/kpi_value_api/Dockerfile @@ -63,6 +63,8 @@ RUN python3 -m pip install -r requirements.txt # Add component files into working directory WORKDIR /var/teraflow COPY src/kpi_value_api/. kpi_value_api/ +COPY src/kpi_manager/__init__.py kpi_manager/__init__.py +COPY src/kpi_manager/client/. kpi_manager/client/ # Start the service ENTRYPOINT ["python", "-m", "kpi_value_api.service"] diff --git a/src/kpi_value_api/requirements.in b/src/kpi_value_api/requirements.in index 7e4694109..f5695906a 100644 --- a/src/kpi_value_api/requirements.in +++ b/src/kpi_value_api/requirements.in @@ -14,3 +14,4 @@ confluent-kafka==2.3.* requests==2.27.* +prometheus-api-client==0.5.3 \ No newline at end of file diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index 1559457d7..b2ebecad0 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -12,18 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, grpc, requests +import logging, grpc from typing import Tuple, Any -from datetime import datetime from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.proto.context_pb2 import Empty +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceServicer from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiValue, KpiValueType from confluent_kafka import Producer as KafkaProducer +from prometheus_api_client import PrometheusConnect +from prometheus_api_client.utils import parse_datetime + +from kpi_manager.client.KpiManagerClient import KpiManagerClient LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('KpiValueAPI', 'NBIgRPC') @@ -63,40 +68,67 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): ) -> KpiValueList: LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request)) response = KpiValueList() - metrics = [kpi.kpi_id for kpi in request.kpi_id] - start_timestamps = [timestamp for timestamp in request.start_timestamp] - end_timestamps = [timestamp for timestamp in request.end_timestamp] - results = [] + + kpi_manager_client = KpiManagerClient() + prom_connect = PrometheusConnect(url=PROM_URL) - for start, end in zip(start_timestamps, end_timestamps): - start_str = datetime.fromtimestamp(start.seconds).isoformat() + "Z" - end_str = datetime.fromtimestamp(end.seconds).isoformat() + "Z" + metrics = [self.GetKpiSampleType(kpi, kpi_manager_client) for kpi in request.kpi_id] + start_timestamps = [parse_datetime(timestamp) for timestamp in request.start_timestamp] + end_timestamps = [parse_datetime(timestamp) for timestamp in request.end_timestamp] + prom_response = [] + for start_time, end_time in zip(start_timestamps, end_timestamps): for metric in metrics: - url = f'{PROM_URL}/api/v1/query_range' - params = { - 'query': metric, - 'start': start_str, - 'end' : end_str, - 'step' : '30s' # or any other step you need - } - response = requests.get(url, params=params) - if response.status_code == 200: - data = response.json() - for result in data['data']['result']: - for value in result['values']: - kpi_value = KpiValue( - kpi_id=metric, - timestamp=str(seconds=value[0]), - kpi_value_type=self._convert_value_to_kpi_value_type(value[1]) - ) - results.append(kpi_value) + # print(start_time, end_time, metric) + prom_response.append( + prom_connect.custom_query_range( + query = metric, # this is the metric name and label config + start_time = start_time, + end_time = end_time, + step = 30, # or any other step value (missing in gRPC Filter request) + ) + ) + + for single_resposne in prom_response: + # print ("{:}".format(single_resposne)) + for record in single_resposne: + # print("Record >>> kpi: {:} >>> time & values set: {:}".format(record['metric']['__name__'], record['values'])) + for value in record['values']: + # print("{:} - {:}".format(record['metric']['__name__'], value)) + kpi_value = KpiValue() + kpi_value.kpi_id.kpi_id = record['metric']['__name__'], + kpi_value.timestamp = value[0], + kpi_value.kpi_value_type = self.ConverValueToKpiValueType(value[1]) + response.kpi_value_list.append(kpi_value) + return response + + def GetKpiSampleType(self, kpi_value: str, kpi_manager_client): + print("--- START -----") - def _convert_value_to_kpi_value_type(self, value): + kpi_id = KpiId() + kpi_id.kpi_id.uuid = kpi_value.kpi_id.kpi_id.uuid + # print("KpiId generated: {:}".format(kpi_id)) + + try: + kpi_descriptor_object = KpiDescriptor() + kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) + # TODO: why kpi_descriptor_object recevies a KpiDescriptor type object not Empty type object??? + if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid: + LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) + print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) + return KpiSampleType.Name(kpi_descriptor_object.kpi_sample_type) # extract and return the name of KpiSampleType + else: + LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) + print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) + except Exception as e: + LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e)) + print ("Unable to get KpiDescriptor. Error: {:}".format(e)) + + def ConverValueToKpiValueType(self, value): # Check if the value is an integer (int64) try: - int64_value = int(value) - return KpiValueType(int64Val=int64_value) + int_value = int(value) + return KpiValueType(int64Val=int_value) except ValueError: pass # Check if the value is a float @@ -112,7 +144,6 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): # If none of the above, treat it as a string return KpiValueType(stringVal=value) - def delivery_callback(self, err, msg): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) else: LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) diff --git a/src/kpi_value_writer/service/MetricWriterToPrometheus.py b/src/kpi_value_writer/service/MetricWriterToPrometheus.py index b68116478..40bffa06e 100644 --- a/src/kpi_value_writer/service/MetricWriterToPrometheus.py +++ b/src/kpi_value_writer/service/MetricWriterToPrometheus.py @@ -93,4 +93,4 @@ class MetricWriterToPrometheus: print("Metric {:} is already registered. Skipping.".format(metric_name)) else: LOGGER.error("Error while pushing metric: {}".format(e)) - raise \ No newline at end of file + raise -- GitLab From aca1fe1b241353c4aad9e5fa7dccb99a131794a2 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Fri, 2 Aug 2024 09:58:58 +0000 Subject: [PATCH 10/14] Temporarly defines the static value of env variables to test the working of microservice. - KFK_NAMESPACE and KFK_PORT --- src/common/tools/kafka/Variables.py | 6 ++++-- src/kpi_value_writer/service/MetricWriterToPrometheus.py | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index e3ee2016a..168957a26 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -23,8 +23,10 @@ LOGGER = logging.getLogger(__name__) class KafkaConfig(Enum): KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}' - KFK_NAMESPACE = get_setting('KFK_NAMESPACE') - KFK_PORT = get_setting('KFK_SERVER_PORT') + KFK_NAMESPACE = 'kafka' + # KFK_NAMESPACE = get_setting('KFK_NAMESPACE') + KFK_PORT = '9092' + # KFK_PORT = get_setting('KFK_SERVER_PORT') # SERVER_ADDRESS = "127.0.0.1:9092" SERVER_ADDRESS = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS }) diff --git a/src/kpi_value_writer/service/MetricWriterToPrometheus.py b/src/kpi_value_writer/service/MetricWriterToPrometheus.py index 40bffa06e..81324b759 100644 --- a/src/kpi_value_writer/service/MetricWriterToPrometheus.py +++ b/src/kpi_value_writer/service/MetricWriterToPrometheus.py @@ -94,3 +94,4 @@ class MetricWriterToPrometheus: else: LOGGER.error("Error while pushing metric: {}".format(e)) raise + -- GitLab From d8687c1cdcbe4289c74bd814c0f510cbbe22547a Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Fri, 2 Aug 2024 10:04:31 +0000 Subject: [PATCH 11/14] minor changes in code. - refine Kpi_DB.py methods. - improve description of messages. - imporve the text description. --- src/kpi_manager/database/Kpi_DB.py | 4 +--- src/kpi_value_api/tests/messages.py | 5 +++-- src/kpi_value_writer/service/MetricWriterToPrometheus.py | 4 ++-- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/kpi_manager/database/Kpi_DB.py b/src/kpi_manager/database/Kpi_DB.py index 530abe457..49ad9c9b5 100644 --- a/src/kpi_manager/database/Kpi_DB.py +++ b/src/kpi_manager/database/Kpi_DB.py @@ -70,8 +70,7 @@ class KpiDB: session.rollback() if "psycopg2.errors.UniqueViolation" in str(e): LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}") - raise AlreadyExistsException(row.__class__.__name__, row, - extra_details=["Unique key voilation: {:}".format(e)] ) + raise AlreadyExistsException(row.__class__.__name__, row, extra_details=["Unique key voilation: {:}".format(e)] ) else: LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}") raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) @@ -90,7 +89,6 @@ class KpiDB: print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search)) return None except Exception as e: - session.rollback() LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}") raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)]) finally: diff --git a/src/kpi_value_api/tests/messages.py b/src/kpi_value_api/tests/messages.py index c2a1cbb0b..d8ad14bd4 100644 --- a/src/kpi_value_api/tests/messages.py +++ b/src/kpi_value_api/tests/messages.py @@ -18,8 +18,9 @@ from common.proto.kpi_value_api_pb2 import KpiValue, KpiValueList def create_kpi_value_list(): _create_kpi_value_list = KpiValueList() - # To run this experiment sucessfully, already existing UUID in KPI DB in necessary. - # because the UUID is used to get the descriptor form KPI DB. + # To run this experiment sucessfully, add an existing UUID of a KPI Descriptor from the KPI DB. + # This UUID is used to get the descriptor form the KPI DB. If the Kpi ID does not exists, + # some part of the code won't execute. EXISTING_KPI_IDs = ["725ce3ad-ac67-4373-bd35-8cd9d6a86e09", str(uuid.uuid4()), str(uuid.uuid4())] diff --git a/src/kpi_value_writer/service/MetricWriterToPrometheus.py b/src/kpi_value_writer/service/MetricWriterToPrometheus.py index 81324b759..f1d079783 100644 --- a/src/kpi_value_writer/service/MetricWriterToPrometheus.py +++ b/src/kpi_value_writer/service/MetricWriterToPrometheus.py @@ -63,8 +63,8 @@ class MetricWriterToPrometheus: def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue): # merge both gRPC messages into single varible. cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value) - tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} # extracted values will be used as metric tag - metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] + tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} + metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] # These values will be used as metric tags metric_name = cooked_kpi['kpi_sample_type'] try: if metric_name not in PROM_METRICS: # Only register the metric, when it doesn't exists -- GitLab From 624a1817fc3faec5f99b044edf54b5d4f5281458 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Fri, 2 Aug 2024 14:07:29 +0000 Subject: [PATCH 12/14] Cleaning unit test and messages files. - unused imports and functions are removed --- src/kpi_manager/tests/test_kpi_manager.py | 66 ------------------- .../tests/test_kpi_value_writer.py | 23 +------ 2 files changed, 1 insertion(+), 88 deletions(-) diff --git a/src/kpi_manager/tests/test_kpi_manager.py b/src/kpi_manager/tests/test_kpi_manager.py index da149e3fe..219fdadee 100755 --- a/src/kpi_manager/tests/test_kpi_manager.py +++ b/src/kpi_manager/tests/test_kpi_manager.py @@ -93,13 +93,6 @@ def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disab # Prepare Environment, should be the first test ################################################## -# # ERROR on this test --- -# def test_prepare_environment( -# context_client : ContextClient, # pylint: disable=redefined-outer-name,unused-argument -# ): -# context_id = json_context_id(DEFAULT_CONTEXT_NAME) -# context_client.SetContext(Context(**json_context(DEFAULT_CONTEXT_NAME))) -# context_client.SetTopology(Topology(**json_topology(DEFAULT_TOPOLOGY_NAME, context_id=context_id))) ########################### # Tests Implementation of Kpi Manager @@ -152,62 +145,3 @@ def test_set_list_of_KPIs(kpi_manager_client): # adding KPI for kpi in KPIs_TO_SEARCH: kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi)) - - -# ---------- 2nd Iteration Tests ----------------- -# def test_SetKpiDescriptor(kpi_manager_client): -# LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ") -# with open("kpi_manager/tests/KPI_configs.json", 'r') as file: -# data = json.load(file) -# _descriptors = data.get('KPIs', []) -# for _descritor_name in _descriptors: -# response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(_descritor_name)) -# LOGGER.info("Response gRPC message object: {:}".format(response)) -# assert isinstance(response, KpiId) - -# def test_GetKpiDescriptor(kpi_manager_client): -# LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ") -# response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request()) -# LOGGER.info("Response gRPC message object: {:}".format(response)) -# assert isinstance(response, KpiDescriptor) - -# def test_DeleteKpiDescriptor(kpi_manager_client): -# LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ") -# response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) -# del_response = kpi_manager_client.DeleteKpiDescriptor(response) -# kpi_manager_client.GetKpiDescriptor(response) -# LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response)) -# assert isinstance(del_response, Empty) - -# def test_SelectKpiDescriptor(kpi_manager_client): -# LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ") -# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a()) -# response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request_a()) -# LOGGER.info("Response gRPC message object: {:}".format(response)) -# assert isinstance(response, KpiDescriptorList) - -# ------------- INITIAL TESTs ---------------- -# Test case that makes use of client fixture to test server's CreateKpi method -# def test_set_kpi(kpi_manager_client): # pylint: disable=redefined-outer-name -# # make call to server -# LOGGER.warning('test_create_kpi requesting') -# for i in range(3): -# response = kpi_manager_client.SetKpiDescriptor(create_kpi_request(str(i+1))) -# LOGGER.debug(str(response)) -# assert isinstance(response, KpiId) - -# # Test case that makes use of client fixture to test server's DeleteKpi method -# def test_delete_kpi(kpi_manager_client): # pylint: disable=redefined-outer-name -# # make call to server -# LOGGER.warning('delete_kpi requesting') -# response = kpi_manager_client.SetKpiDescriptor(create_kpi_request('4')) -# response = kpi_manager_client.DeleteKpiDescriptor(response) -# LOGGER.debug(str(response)) -# assert isinstance(response, Empty) - -# # Test case that makes use of client fixture to test server's GetKpiDescriptor method -# def test_select_kpi_descriptor(kpi_manager_client): # pylint: disable=redefined-outer-name -# LOGGER.warning('test_selectkpidescritor begin') -# response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request()) -# LOGGER.debug(str(response)) -# assert isinstance(response, KpiDescriptorList) diff --git a/src/kpi_value_writer/tests/test_kpi_value_writer.py b/src/kpi_value_writer/tests/test_kpi_value_writer.py index 40593af97..fce043d7f 100755 --- a/src/kpi_value_writer/tests/test_kpi_value_writer.py +++ b/src/kpi_value_writer/tests/test_kpi_value_writer.py @@ -14,32 +14,12 @@ import logging from kpi_value_writer.service.KpiValueWriter import KpiValueWriter -from kpi_value_writer.tests.test_messages import create_kpi_id_request, create_kpi_descriptor_request from common.tools.kafka.Variables import KafkaTopic -from common.proto.kpi_manager_pb2 import KpiDescriptor -from kpi_manager.client.KpiManagerClient import KpiManagerClient -LOGGER = logging.getLogger(__name__) - -# def test_GetKpiDescriptor(): -# LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ") -# kpi_manager_client = KpiManagerClient() -# # adding KPI -# LOGGER.info(" --->>> calling SetKpiDescriptor ") -# response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) -# # get KPI -# LOGGER.info(" --->>> calling GetKpiDescriptor with response ID") -# response = kpi_manager_client.GetKpiDescriptor(response_id) -# LOGGER.info("Response gRPC message object: {:}".format(response)) - -# LOGGER.info(" --->>> calling GetKpiDescriptor with random ID") -# rand_response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request()) -# LOGGER.info("Response gRPC message object: {:}".format(rand_response)) -# LOGGER.info("\n------------------ TEST FINISHED ---------------------\n") -# assert isinstance(response, KpiDescriptor) +LOGGER = logging.getLogger(__name__) # -------- Initial Test ---------------- def test_validate_kafka_topics(): @@ -50,4 +30,3 @@ def test_validate_kafka_topics(): def test_KafkaConsumer(): LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ") KpiValueWriter.RunKafkaConsumer() - -- GitLab From 793f7ffa826859536507b804e31738be453a53b6 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Fri, 2 Aug 2024 14:09:52 +0000 Subject: [PATCH 13/14] Updated Promtheus URL - PROM_URL variable is updated with FQDN of Prometheus. --- src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index b2ebecad0..5e7c3d139 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -32,7 +32,7 @@ from kpi_manager.client.KpiManagerClient import KpiManagerClient LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('KpiValueAPI', 'NBIgRPC') -PROM_URL = "http://localhost:9090" +PROM_URL = "http://prometheus-k8s.monitoring.svc.cluster.local:9090" # TODO: updated with the env variables class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): def __init__(self): @@ -79,7 +79,8 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): prom_response = [] for start_time, end_time in zip(start_timestamps, end_timestamps): for metric in metrics: - # print(start_time, end_time, metric) + print(start_time, end_time, metric) + LOGGER.debug(">>> Query: {:}".format(metric)) prom_response.append( prom_connect.custom_query_range( query = metric, # this is the metric name and label config -- GitLab From 557efe071666d99c8f7ed34695c820368c494373 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Fri, 2 Aug 2024 15:15:03 +0000 Subject: [PATCH 14/14] changes in README files. - README files of kpi manager/value writer/api are updated to reflect new changes. --- src/kpi_manager/README.md | 27 +++++++++++---------------- src/kpi_value_api/README.md | 23 +++++++++++++++++++++++ src/kpi_value_writer/README.md | 32 ++++++++++---------------------- 3 files changed, 44 insertions(+), 38 deletions(-) create mode 100644 src/kpi_value_api/README.md diff --git a/src/kpi_manager/README.md b/src/kpi_manager/README.md index c1feadcc4..6e9b56d93 100644 --- a/src/kpi_manager/README.md +++ b/src/kpi_manager/README.md @@ -1,29 +1,24 @@ # How to locally run and test KPI manager micro-service -## --- File links need to be updated. --- ### Pre-requisets -The following requirements should be fulfilled before the execuation of KPI management service. +Ensure the following requirements are met before executing the KPI management service: -1. Verify that [kpi_management.proto](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/proto/kpi_management.proto) file exists and grpcs file are generated sucessfully. -2. Virtual enviornment exist with all the required packages listed in ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/requirements.in) are installed sucessfully. -3. Verify the creation of required database and table. -[KPI DB test](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/database/tests/KpiDBtests.py) python file enlist the functions to create tables and database and -[KPI Engine](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/service/database/KpiEngine.py) contains the DB string, update the string as per your deployment. +1. A virtual enviornment exist with all the required packages listed in ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_manager/requirements.in) sucessfully installed. +2. Verify the creation of required database and table. The +[KPI DB test](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_manager/tests/test_kpi_db.py) python file lists the functions to create tables and the database. The +[KPI Engine](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_manager/database/KpiEngine.py) file contains the DB string. ### Messages format templates -["Messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/tests/test_messages.py) python file enlist the basic gRPC messages format used during the testing. +The ["messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_manager/tests/test_messages.py) python file contains templates for creating gRPC messages. -### Test file -["KPI management test"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/tests/test_kpi_manager.py) python file enlist different tests conducted during the experiment. +### Unit test file +The ["KPI manager test"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_manager/tests/test_kpi_manager.py) python file lists various tests conducted to validate functionality. ### Flow of execution (Kpi Maanager Service functions) 1. Call the `create_database()` and `create_tables()` functions from `Kpi_DB` class to create the required database and table if they don't exist. Call `verify_tables` to verify the existence of KPI table. -2. Call the gRPC method `SetKpiDescriptor(KpiDescriptor)->KpiId` to add the KpiDescriptor in `Kpi` DB. `KpiDescriptor` and `KpiId` are both pre-defined gRPC message types. +2. Call the gRPC method `SetKpiDescriptor(KpiDescriptor)->KpiId` to add the KpiDescriptor to the `Kpi` DB. `KpiDescriptor` and `KpiId` are both pre-defined gRPC message types. -3. Call `GetKpiDescriptor(KpiId)->KpiDescriptor` to read the `KpiDescriptor` from DB and `DeleteKpiDescriptor(KpiId)` to delete the `KpiDescriptor` from DB. +3. Call `GetKpiDescriptor(KpiId)->KpiDescriptor` to read the `KpiDescriptor` from the DB and `DeleteKpiDescriptor(KpiId)` to delete the `KpiDescriptor` from the DB. -4. Call `SelectKpiDescriptor(KpiDescriptorFilter)->KpiDescriptorList` to get all `KpiDescriptor` objects that matches the filter criteria. `KpiDescriptorFilter` and `KpiDescriptorList` are pre-defined gRPC message types. - -## For KPI composer and KPI writer -The functionalities of KPI composer and writer is heavily dependent upon Telemetery service. Therfore, these services has other pre-requsites that are mention [here](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/telemetry/requirements.in). +4. Call `SelectKpiDescriptor(KpiDescriptorFilter)->KpiDescriptorList` to get all `KpiDescriptor` objects that matches filter criteria. `KpiDescriptorFilter` and `KpiDescriptorList` are pre-defined gRPC message types. diff --git a/src/kpi_value_api/README.md b/src/kpi_value_api/README.md new file mode 100644 index 000000000..70ba2c5e7 --- /dev/null +++ b/src/kpi_value_api/README.md @@ -0,0 +1,23 @@ +# How to locally run and test KPI Value API micro-service + +### Pre-requisets +Ensure the following requirements are met before executing the KPI Value API service. + +1. The KPI Manger service is running and Apache Kafka is running. + +2. A virtual enviornment exist with all the required packages listed in ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_api/requirements.in) file sucessfully installed. + +3. Call the ["create_all_topics()"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/common/tools/kafka/Variables.py) function to verify the existence of all required topics on kafka. + +### Messages format templates +The ["messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_api/tests/messages.py) python file contains templates for creating gRPC messages. + +### Unit test file +The ["KPI Value API test"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_api/tests/test_kpi_value_api.py) python file enlist various tests conducted to validate functionality. + +### Flow of execution (Kpi Maanager Service functions) +1. Call the `create_new_topic_if_not_exists()` method to create any new topics if needed. + +2. Call `StoreKpiValues(KpiValueList)` to produce `Kpi Value` on a Kafka Topic. (The `KpiValueWriter` microservice will consume and process the `Kpi Value`) + +3. Call `SelectKpiValues(KpiValueFilter) -> KpiValueList` to read metric from the Prometheus DB. diff --git a/src/kpi_value_writer/README.md b/src/kpi_value_writer/README.md index 72ba6e559..c45a0e395 100644 --- a/src/kpi_value_writer/README.md +++ b/src/kpi_value_writer/README.md @@ -1,29 +1,17 @@ -# How to locally run and test KPI manager micro-service +# How to locally run and test the KPI Value Writer micro-service -## --- File links need to be updated. --- ### Pre-requisets -The following requirements should be fulfilled before the execuation of KPI management service. +Ensure the following requirements are meet before executing the KPI Value Writer service> -1. Verify that [kpi_management.proto](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/proto/kpi_management.proto) file exists and grpcs file are generated sucessfully. -2. Virtual enviornment exist with all the required packages listed in ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/requirements.in) are installed sucessfully. -3. Verify the creation of required database and table. -[KPI DB test](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/database/tests/KpiDBtests.py) python file enlist the functions to create tables and database and -[KPI Engine](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/service/database/KpiEngine.py) contains the DB string, update the string as per your deployment. +1. The KPI Manger and KPI Value API services are running and Apache Kafka is running. -### Messages format templates -["Messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/tests/test_messages.py) python file enlist the basic gRPC messages format used during the testing. - -### Test file -["KPI management test"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/tests/test_kpi_manager.py) python file enlist different tests conducted during the experiment. - -### Flow of execution (Kpi Maanager Service functions) -1. Call the `create_database()` and `create_tables()` functions from `Kpi_DB` class to create the required database and table if they don't exist. Call `verify_tables` to verify the existence of KPI table. +2. A Virtual enviornment exist with all the required packages listed in the ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_writer/requirements.in) file installed sucessfully. -2. Call the gRPC method `SetKpiDescriptor(KpiDescriptor)->KpiId` to add the KpiDescriptor in `Kpi` DB. `KpiDescriptor` and `KpiId` are both pre-defined gRPC message types. - -3. Call `GetKpiDescriptor(KpiId)->KpiDescriptor` to read the `KpiDescriptor` from DB and `DeleteKpiDescriptor(KpiId)` to delete the `KpiDescriptor` from DB. +### Messages format templates +The ["messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_writer/tests/test_messages.py) python file contains the templates to create gRPC messages. -4. Call `SelectKpiDescriptor(KpiDescriptorFilter)->KpiDescriptorList` to get all `KpiDescriptor` objects that matches the filter criteria. `KpiDescriptorFilter` and `KpiDescriptorList` are pre-defined gRPC message types. +### Unit test file +The ["KPI Value API test"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_writer/tests/test_kpi_value_writer.py) python file enlist various tests conducted to validate functionality. -## For KPI composer and KPI writer -The functionalities of KPI composer and writer is heavily dependent upon Telemetery service. Therfore, these services has other pre-requsites that are mention [here](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/telemetry/requirements.in). \ No newline at end of file +### Flow of execution +1. Call the `RunKafkaConsumer` method from the `KpiValueWriter` class to start consuming the `KPI Value` generated by the `KPI Value API` or `Telemetry`. For every valid `KPI Value` consumer from Kafka, it invokes the `PrometheusWriter` class to prepare and push the metric to the Promethues DB. -- GitLab