diff --git a/scripts/run_tests_locally-kpi-manager.sh b/scripts/run_tests_locally-kpi-manager.sh index a6a24f90db93d56300ac997bd00675c479ef13ae..8a4ce8d95c74657451147078a1d93e891dfc2ac8 100755 --- a/scripts/run_tests_locally-kpi-manager.sh +++ b/scripts/run_tests_locally-kpi-manager.sh @@ -24,7 +24,7 @@ cd $PROJECTDIR/src # python3 kpi_manager/tests/test_unitary.py RCFILE=$PROJECTDIR/coverage/.coveragerc -CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}') +CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace ${CRDB_NAMESPACE} -o 'jsonpath={.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ kpi_manager/tests/test_kpi_manager.py diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index 9d42f1550d29e9a2abc9a9bf9b8fc054350e85ec..5ada88a1ea0a7eae31eda741d81757fa624521de 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -55,7 +55,7 @@ class KafkaTopic(Enum): all_topics = [member.value for member in KafkaTopic] LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.get_kafka_address())) if( KafkaTopic.create_new_topic_if_not_exists( all_topics )): - LOGGER.debug("All topics are created sucsessfully") + LOGGER.debug("All topics are created sucsessfully or Already Exists") return True else: LOGGER.debug("Error creating all topics") diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index 3df8dd5b66f36fe69ef9ada5d5fe882fc16abeb2..4ea978fafc8d7454d41f64182d553d030215113a 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, grpc -from typing import Tuple, Any +import logging, grpc, json +from typing import Dict from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.tools.kafka.Variables import KafkaConfig, KafkaTopic @@ -37,32 +37,42 @@ PROM_URL = "http://prometheus-k8s.monitoring.svc.cluster.local:9090" # TO class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): def __init__(self): LOGGER.debug('Init KpiValueApiService') - + self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext ) -> Empty: LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request)) - producer_obj = KafkaProducer({ - 'bootstrap.servers' : KafkaConfig.get_kafka_address() - }) + + producer = self.kafka_producer for kpi_value in request.kpi_value_list: - kpi_value_to_produce : Tuple [str, Any, Any] = ( - kpi_value.kpi_id.kpi_id, - kpi_value.timestamp, - kpi_value.kpi_value_type # kpi_value.kpi_value_type.(many options) how? - ) + kpi_value_to_produce : Dict = { + "kpi_uuid" : kpi_value.kpi_id.kpi_id.uuid, + "timestamp" : kpi_value.timestamp.timestamp, + "kpi_value_type" : self.ExtractKpiValueByType(kpi_value.kpi_value_type) + } LOGGER.debug('KPI to produce is {:}'.format(kpi_value_to_produce)) msg_key = "gRPC-kpivalueapi" # str(__class__.__name__) can be used - producer_obj.produce( + producer.produce( KafkaTopic.VALUE.value, key = msg_key, - value = kpi_value.SerializeToString(), # value = json.dumps(kpi_value_to_produce), + value = json.dumps(kpi_value_to_produce), callback = self.delivery_callback ) - producer_obj.flush() + producer.flush() return Empty() + def ExtractKpiValueByType(self, value): + attributes = [ 'floatVal' , 'int32Val' , 'uint32Val','int64Val', + 'uint64Val', 'stringVal', 'boolVal'] + for attr in attributes: + try: + return getattr(value, attr) + except (ValueError, TypeError, AttributeError): + continue + return None + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectKpiValues(self, request: KpiValueFilter, grpc_context: grpc.ServicerContext ) -> KpiValueList: @@ -130,13 +140,13 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): try: int_value = int(value) return KpiValueType(int64Val=int_value) - except ValueError: + except (ValueError, TypeError): pass # Check if the value is a float try: float_value = float(value) return KpiValueType(floatVal=float_value) - except ValueError: + except (ValueError, TypeError): pass # Check if the value is a boolean if value.lower() in ['true', 'false']: diff --git a/src/kpi_value_writer/service/KpiValueWriter.py b/src/kpi_value_writer/service/KpiValueWriter.py index eba6516747bfe3b8a8e983967d47ec21d26bc8a4..8b258a1424cc44be4dcb9134ee913c707cc44bfa 100644 --- a/src/kpi_value_writer/service/KpiValueWriter.py +++ b/src/kpi_value_writer/service/KpiValueWriter.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import logging import threading from common.tools.kafka.Variables import KafkaConfig, KafkaTopic @@ -38,28 +39,25 @@ class KpiValueWriter(GenericGrpcService): def __init__(self, cls_name : str = __name__) -> None: port = get_service_port_grpc(ServiceNameEnum.KPIVALUEWRITER) super().__init__(port, cls_name=cls_name) + self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : 'KpiValueWriter', + 'auto.offset.reset' : 'latest'}) - @staticmethod - def RunKafkaConsumer(): - thread = threading.Thread(target=KpiValueWriter.KafkaKpiConsumer, args=()) + def RunKafkaConsumer(self): + thread = threading.Thread(target=self.KafkaKpiConsumer, args=()) ACTIVE_CONSUMERS.append(thread) thread.start() - @staticmethod - def KafkaKpiConsumer(): + def KafkaKpiConsumer(self): kpi_manager_client = KpiManagerClient() metric_writer = MetricWriterToPrometheus() - kafka_consumer = KafkaConsumer( - { 'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : 'KpiValueWriter', - 'auto.offset.reset' : 'latest'} - ) - kafka_consumer.subscribe([KafkaTopic.VALUE.value]) + consumer = self.kafka_consumer + consumer.subscribe([KafkaTopic.VALUE.value]) LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) while True: - raw_kpi = kafka_consumer.poll(1.0) + raw_kpi = consumer.poll(1.0) if raw_kpi is None: continue elif raw_kpi.error(): @@ -69,24 +67,21 @@ class KpiValueWriter(GenericGrpcService): print("Consumer error: {}".format(raw_kpi.error())) continue try: - kpi_value = KpiValue() - kpi_value.ParseFromString(raw_kpi.value()) + kpi_value = json.loads(raw_kpi.value().decode('utf-8')) LOGGER.info("Received KPI : {:}".format(kpi_value)) print("Received KPI : {:}".format(kpi_value)) - KpiValueWriter.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer) + self.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer) except Exception as e: print("Error detail: {:}".format(e)) continue - @staticmethod - def get_kpi_descriptor(kpi_value: str, kpi_manager_client, metric_writer): + def get_kpi_descriptor(self, kpi_value: str, kpi_manager_client, metric_writer): print("--- START -----") kpi_id = KpiId() - kpi_id.kpi_id.uuid = kpi_value.kpi_id.kpi_id.uuid + kpi_id.kpi_id.uuid = kpi_value['kpi_uuid'] print("KpiId generated: {:}".format(kpi_id)) # print("Kpi manger client created: {:}".format(kpi_manager_client)) - try: kpi_descriptor_object = KpiDescriptor() kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) diff --git a/src/kpi_value_writer/service/MetricWriterToPrometheus.py b/src/kpi_value_writer/service/MetricWriterToPrometheus.py index a86b8f34e462a2703f5dfa4ece947613978bdef9..85e618a4b5b330cb83cf255652e7be8dff2dabd3 100644 --- a/src/kpi_value_writer/service/MetricWriterToPrometheus.py +++ b/src/kpi_value_writer/service/MetricWriterToPrometheus.py @@ -14,10 +14,8 @@ # read Kafka stream from Kafka topic -# import ast -# import time -# import threading import logging +from typing import Dict from prometheus_client import Gauge from common.proto.kpi_sample_types_pb2 import KpiSampleType @@ -47,13 +45,13 @@ class MetricWriterToPrometheus: 'slice_id' : kpi_descriptor.slice_id.slice_uuid.uuid, 'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid, 'link_id' : kpi_descriptor.link_id.link_uuid.uuid, - 'time_stamp' : kpi_value.timestamp.timestamp, - 'kpi_value' : kpi_value.kpi_value_type.floatVal + 'time_stamp' : kpi_value['timestamp'], + 'kpi_value' : kpi_value['kpi_value_type'] } # LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi)) return cooked_kpi - def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue): + def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: Dict): # merge both gRPC messages into single varible. cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value) tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} @@ -76,7 +74,7 @@ class MetricWriterToPrometheus: connection_id = cooked_kpi['connection_id'], link_id = cooked_kpi['link_id'], time_stamp = cooked_kpi['time_stamp'], - ).set(float(cooked_kpi['kpi_value'])) + ).set(cooked_kpi['kpi_value']) LOGGER.debug("Metric pushed to the endpoints: {:}".format(PROM_METRICS[metric_name])) except ValueError as e: diff --git a/src/kpi_value_writer/tests/test_kpi_value_writer.py b/src/kpi_value_writer/tests/test_kpi_value_writer.py index fce043d7fd6c9b5cbb9374d0b059cb1e2fa65a24..b784fae5da713f9bd7cd7a1668f48b080f7a84fa 100755 --- a/src/kpi_value_writer/tests/test_kpi_value_writer.py +++ b/src/kpi_value_writer/tests/test_kpi_value_writer.py @@ -29,4 +29,5 @@ def test_validate_kafka_topics(): def test_KafkaConsumer(): LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ") - KpiValueWriter.RunKafkaConsumer() + kpi_value_writer = KpiValueWriter() + kpi_value_writer.RunKafkaConsumer()