From 3de3bf9865aabccc9d5a2712fb59ccb4250349ca Mon Sep 17 00:00:00 2001 From: Waleed Akbar <wakbar@cttc.es> Date: Sun, 11 Aug 2024 15:18:06 +0000 Subject: [PATCH] Changes in KPI Manager KPI value API and value Writer. - updated cmd in test file of KPI manager - move kafka producer object to __init__ function. - write JSON object to Kafka - Read JSON object from Kafka - Slight to manage the affect of JSON object. - Static methods are removed. --- scripts/run_tests_locally-kpi-manager.sh | 2 +- src/common/tools/kafka/Variables.py | 2 +- .../service/KpiValueApiServiceServicerImpl.py | 42 ++++++++++++------- .../service/KpiValueWriter.py | 33 +++++++-------- .../service/MetricWriterToPrometheus.py | 12 +++--- .../tests/test_kpi_value_writer.py | 3 +- 6 files changed, 49 insertions(+), 45 deletions(-) diff --git a/scripts/run_tests_locally-kpi-manager.sh b/scripts/run_tests_locally-kpi-manager.sh index a6a24f90d..8a4ce8d95 100755 --- a/scripts/run_tests_locally-kpi-manager.sh +++ b/scripts/run_tests_locally-kpi-manager.sh @@ -24,7 +24,7 @@ cd $PROJECTDIR/src # python3 kpi_manager/tests/test_unitary.py RCFILE=$PROJECTDIR/coverage/.coveragerc -CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}') +CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace ${CRDB_NAMESPACE} -o 'jsonpath={.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ kpi_manager/tests/test_kpi_manager.py diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index 9d42f1550..5ada88a1e 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -55,7 +55,7 @@ class KafkaTopic(Enum): all_topics = [member.value for member in KafkaTopic] LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.get_kafka_address())) if( KafkaTopic.create_new_topic_if_not_exists( all_topics )): - LOGGER.debug("All topics are created sucsessfully") + LOGGER.debug("All topics are created sucsessfully or Already Exists") return True else: LOGGER.debug("Error creating all topics") diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index 3df8dd5b6..4ea978faf 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, grpc -from typing import Tuple, Any +import logging, grpc, json +from typing import Dict from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.tools.kafka.Variables import KafkaConfig, KafkaTopic @@ -37,32 +37,42 @@ PROM_URL = "http://prometheus-k8s.monitoring.svc.cluster.local:9090" # TO class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): def __init__(self): LOGGER.debug('Init KpiValueApiService') - + self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext ) -> Empty: LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request)) - producer_obj = KafkaProducer({ - 'bootstrap.servers' : KafkaConfig.get_kafka_address() - }) + + producer = self.kafka_producer for kpi_value in request.kpi_value_list: - kpi_value_to_produce : Tuple [str, Any, Any] = ( - kpi_value.kpi_id.kpi_id, - kpi_value.timestamp, - kpi_value.kpi_value_type # kpi_value.kpi_value_type.(many options) how? - ) + kpi_value_to_produce : Dict = { + "kpi_uuid" : kpi_value.kpi_id.kpi_id.uuid, + "timestamp" : kpi_value.timestamp.timestamp, + "kpi_value_type" : self.ExtractKpiValueByType(kpi_value.kpi_value_type) + } LOGGER.debug('KPI to produce is {:}'.format(kpi_value_to_produce)) msg_key = "gRPC-kpivalueapi" # str(__class__.__name__) can be used - producer_obj.produce( + producer.produce( KafkaTopic.VALUE.value, key = msg_key, - value = kpi_value.SerializeToString(), # value = json.dumps(kpi_value_to_produce), + value = json.dumps(kpi_value_to_produce), callback = self.delivery_callback ) - producer_obj.flush() + producer.flush() return Empty() + def ExtractKpiValueByType(self, value): + attributes = [ 'floatVal' , 'int32Val' , 'uint32Val','int64Val', + 'uint64Val', 'stringVal', 'boolVal'] + for attr in attributes: + try: + return getattr(value, attr) + except (ValueError, TypeError, AttributeError): + continue + return None + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectKpiValues(self, request: KpiValueFilter, grpc_context: grpc.ServicerContext ) -> KpiValueList: @@ -130,13 +140,13 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): try: int_value = int(value) return KpiValueType(int64Val=int_value) - except ValueError: + except (ValueError, TypeError): pass # Check if the value is a float try: float_value = float(value) return KpiValueType(floatVal=float_value) - except ValueError: + except (ValueError, TypeError): pass # Check if the value is a boolean if value.lower() in ['true', 'false']: diff --git a/src/kpi_value_writer/service/KpiValueWriter.py b/src/kpi_value_writer/service/KpiValueWriter.py index eba651674..8b258a142 100644 --- a/src/kpi_value_writer/service/KpiValueWriter.py +++ b/src/kpi_value_writer/service/KpiValueWriter.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import logging import threading from common.tools.kafka.Variables import KafkaConfig, KafkaTopic @@ -38,28 +39,25 @@ class KpiValueWriter(GenericGrpcService): def __init__(self, cls_name : str = __name__) -> None: port = get_service_port_grpc(ServiceNameEnum.KPIVALUEWRITER) super().__init__(port, cls_name=cls_name) + self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : 'KpiValueWriter', + 'auto.offset.reset' : 'latest'}) - @staticmethod - def RunKafkaConsumer(): - thread = threading.Thread(target=KpiValueWriter.KafkaKpiConsumer, args=()) + def RunKafkaConsumer(self): + thread = threading.Thread(target=self.KafkaKpiConsumer, args=()) ACTIVE_CONSUMERS.append(thread) thread.start() - @staticmethod - def KafkaKpiConsumer(): + def KafkaKpiConsumer(self): kpi_manager_client = KpiManagerClient() metric_writer = MetricWriterToPrometheus() - kafka_consumer = KafkaConsumer( - { 'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : 'KpiValueWriter', - 'auto.offset.reset' : 'latest'} - ) - kafka_consumer.subscribe([KafkaTopic.VALUE.value]) + consumer = self.kafka_consumer + consumer.subscribe([KafkaTopic.VALUE.value]) LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) while True: - raw_kpi = kafka_consumer.poll(1.0) + raw_kpi = consumer.poll(1.0) if raw_kpi is None: continue elif raw_kpi.error(): @@ -69,24 +67,21 @@ class KpiValueWriter(GenericGrpcService): print("Consumer error: {}".format(raw_kpi.error())) continue try: - kpi_value = KpiValue() - kpi_value.ParseFromString(raw_kpi.value()) + kpi_value = json.loads(raw_kpi.value().decode('utf-8')) LOGGER.info("Received KPI : {:}".format(kpi_value)) print("Received KPI : {:}".format(kpi_value)) - KpiValueWriter.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer) + self.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer) except Exception as e: print("Error detail: {:}".format(e)) continue - @staticmethod - def get_kpi_descriptor(kpi_value: str, kpi_manager_client, metric_writer): + def get_kpi_descriptor(self, kpi_value: str, kpi_manager_client, metric_writer): print("--- START -----") kpi_id = KpiId() - kpi_id.kpi_id.uuid = kpi_value.kpi_id.kpi_id.uuid + kpi_id.kpi_id.uuid = kpi_value['kpi_uuid'] print("KpiId generated: {:}".format(kpi_id)) # print("Kpi manger client created: {:}".format(kpi_manager_client)) - try: kpi_descriptor_object = KpiDescriptor() kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) diff --git a/src/kpi_value_writer/service/MetricWriterToPrometheus.py b/src/kpi_value_writer/service/MetricWriterToPrometheus.py index a86b8f34e..85e618a4b 100644 --- a/src/kpi_value_writer/service/MetricWriterToPrometheus.py +++ b/src/kpi_value_writer/service/MetricWriterToPrometheus.py @@ -14,10 +14,8 @@ # read Kafka stream from Kafka topic -# import ast -# import time -# import threading import logging +from typing import Dict from prometheus_client import Gauge from common.proto.kpi_sample_types_pb2 import KpiSampleType @@ -47,13 +45,13 @@ class MetricWriterToPrometheus: 'slice_id' : kpi_descriptor.slice_id.slice_uuid.uuid, 'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid, 'link_id' : kpi_descriptor.link_id.link_uuid.uuid, - 'time_stamp' : kpi_value.timestamp.timestamp, - 'kpi_value' : kpi_value.kpi_value_type.floatVal + 'time_stamp' : kpi_value['timestamp'], + 'kpi_value' : kpi_value['kpi_value_type'] } # LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi)) return cooked_kpi - def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue): + def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: Dict): # merge both gRPC messages into single varible. cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value) tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} @@ -76,7 +74,7 @@ class MetricWriterToPrometheus: connection_id = cooked_kpi['connection_id'], link_id = cooked_kpi['link_id'], time_stamp = cooked_kpi['time_stamp'], - ).set(float(cooked_kpi['kpi_value'])) + ).set(cooked_kpi['kpi_value']) LOGGER.debug("Metric pushed to the endpoints: {:}".format(PROM_METRICS[metric_name])) except ValueError as e: diff --git a/src/kpi_value_writer/tests/test_kpi_value_writer.py b/src/kpi_value_writer/tests/test_kpi_value_writer.py index fce043d7f..b784fae5d 100755 --- a/src/kpi_value_writer/tests/test_kpi_value_writer.py +++ b/src/kpi_value_writer/tests/test_kpi_value_writer.py @@ -29,4 +29,5 @@ def test_validate_kafka_topics(): def test_KafkaConsumer(): LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ") - KpiValueWriter.RunKafkaConsumer() + kpi_value_writer = KpiValueWriter() + kpi_value_writer.RunKafkaConsumer() -- GitLab