Skip to content
Snippets Groups Projects
Commit 3de3bf98 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes in KPI Manager KPI value API and value Writer.

- updated cmd in test file of KPI manager
- move kafka producer object to __init__ function.
- write JSON object to Kafka
-  Read JSON object from Kafka
- Slight to manage the affect of JSON object.
- Static methods are removed.
parent 622fe067
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!261(CTTC) New Analytics Component
......@@ -24,7 +24,7 @@ cd $PROJECTDIR/src
# python3 kpi_manager/tests/test_unitary.py
RCFILE=$PROJECTDIR/coverage/.coveragerc
CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}')
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace ${CRDB_NAMESPACE} -o 'jsonpath={.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
kpi_manager/tests/test_kpi_manager.py
......@@ -55,7 +55,7 @@ class KafkaTopic(Enum):
all_topics = [member.value for member in KafkaTopic]
LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.get_kafka_address()))
if( KafkaTopic.create_new_topic_if_not_exists( all_topics )):
LOGGER.debug("All topics are created sucsessfully")
LOGGER.debug("All topics are created sucsessfully or Already Exists")
return True
else:
LOGGER.debug("Error creating all topics")
......
......@@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, grpc
from typing import Tuple, Any
import logging, grpc, json
from typing import Dict
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
......@@ -37,32 +37,42 @@ PROM_URL = "http://prometheus-k8s.monitoring.svc.cluster.local:9090" # TO
class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
def __init__(self):
LOGGER.debug('Init KpiValueApiService')
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext
) -> Empty:
LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request))
producer_obj = KafkaProducer({
'bootstrap.servers' : KafkaConfig.get_kafka_address()
})
producer = self.kafka_producer
for kpi_value in request.kpi_value_list:
kpi_value_to_produce : Tuple [str, Any, Any] = (
kpi_value.kpi_id.kpi_id,
kpi_value.timestamp,
kpi_value.kpi_value_type # kpi_value.kpi_value_type.(many options) how?
)
kpi_value_to_produce : Dict = {
"kpi_uuid" : kpi_value.kpi_id.kpi_id.uuid,
"timestamp" : kpi_value.timestamp.timestamp,
"kpi_value_type" : self.ExtractKpiValueByType(kpi_value.kpi_value_type)
}
LOGGER.debug('KPI to produce is {:}'.format(kpi_value_to_produce))
msg_key = "gRPC-kpivalueapi" # str(__class__.__name__) can be used
producer_obj.produce(
producer.produce(
KafkaTopic.VALUE.value,
key = msg_key,
value = kpi_value.SerializeToString(), # value = json.dumps(kpi_value_to_produce),
value = json.dumps(kpi_value_to_produce),
callback = self.delivery_callback
)
producer_obj.flush()
producer.flush()
return Empty()
def ExtractKpiValueByType(self, value):
attributes = [ 'floatVal' , 'int32Val' , 'uint32Val','int64Val',
'uint64Val', 'stringVal', 'boolVal']
for attr in attributes:
try:
return getattr(value, attr)
except (ValueError, TypeError, AttributeError):
continue
return None
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SelectKpiValues(self, request: KpiValueFilter, grpc_context: grpc.ServicerContext
) -> KpiValueList:
......@@ -130,13 +140,13 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
try:
int_value = int(value)
return KpiValueType(int64Val=int_value)
except ValueError:
except (ValueError, TypeError):
pass
# Check if the value is a float
try:
float_value = float(value)
return KpiValueType(floatVal=float_value)
except ValueError:
except (ValueError, TypeError):
pass
# Check if the value is a boolean
if value.lower() in ['true', 'false']:
......
......@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import threading
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
......@@ -38,28 +39,25 @@ class KpiValueWriter(GenericGrpcService):
def __init__(self, cls_name : str = __name__) -> None:
port = get_service_port_grpc(ServiceNameEnum.KPIVALUEWRITER)
super().__init__(port, cls_name=cls_name)
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'KpiValueWriter',
'auto.offset.reset' : 'latest'})
@staticmethod
def RunKafkaConsumer():
thread = threading.Thread(target=KpiValueWriter.KafkaKpiConsumer, args=())
def RunKafkaConsumer(self):
thread = threading.Thread(target=self.KafkaKpiConsumer, args=())
ACTIVE_CONSUMERS.append(thread)
thread.start()
@staticmethod
def KafkaKpiConsumer():
def KafkaKpiConsumer(self):
kpi_manager_client = KpiManagerClient()
metric_writer = MetricWriterToPrometheus()
kafka_consumer = KafkaConsumer(
{ 'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'KpiValueWriter',
'auto.offset.reset' : 'latest'}
)
kafka_consumer.subscribe([KafkaTopic.VALUE.value])
consumer = self.kafka_consumer
consumer.subscribe([KafkaTopic.VALUE.value])
LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
while True:
raw_kpi = kafka_consumer.poll(1.0)
raw_kpi = consumer.poll(1.0)
if raw_kpi is None:
continue
elif raw_kpi.error():
......@@ -69,24 +67,21 @@ class KpiValueWriter(GenericGrpcService):
print("Consumer error: {}".format(raw_kpi.error()))
continue
try:
kpi_value = KpiValue()
kpi_value.ParseFromString(raw_kpi.value())
kpi_value = json.loads(raw_kpi.value().decode('utf-8'))
LOGGER.info("Received KPI : {:}".format(kpi_value))
print("Received KPI : {:}".format(kpi_value))
KpiValueWriter.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer)
self.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer)
except Exception as e:
print("Error detail: {:}".format(e))
continue
@staticmethod
def get_kpi_descriptor(kpi_value: str, kpi_manager_client, metric_writer):
def get_kpi_descriptor(self, kpi_value: str, kpi_manager_client, metric_writer):
print("--- START -----")
kpi_id = KpiId()
kpi_id.kpi_id.uuid = kpi_value.kpi_id.kpi_id.uuid
kpi_id.kpi_id.uuid = kpi_value['kpi_uuid']
print("KpiId generated: {:}".format(kpi_id))
# print("Kpi manger client created: {:}".format(kpi_manager_client))
try:
kpi_descriptor_object = KpiDescriptor()
kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id)
......
......@@ -14,10 +14,8 @@
# read Kafka stream from Kafka topic
# import ast
# import time
# import threading
import logging
from typing import Dict
from prometheus_client import Gauge
from common.proto.kpi_sample_types_pb2 import KpiSampleType
......@@ -47,13 +45,13 @@ class MetricWriterToPrometheus:
'slice_id' : kpi_descriptor.slice_id.slice_uuid.uuid,
'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid,
'link_id' : kpi_descriptor.link_id.link_uuid.uuid,
'time_stamp' : kpi_value.timestamp.timestamp,
'kpi_value' : kpi_value.kpi_value_type.floatVal
'time_stamp' : kpi_value['timestamp'],
'kpi_value' : kpi_value['kpi_value_type']
}
# LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi))
return cooked_kpi
def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue):
def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: Dict):
# merge both gRPC messages into single varible.
cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value)
tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'}
......@@ -76,7 +74,7 @@ class MetricWriterToPrometheus:
connection_id = cooked_kpi['connection_id'],
link_id = cooked_kpi['link_id'],
time_stamp = cooked_kpi['time_stamp'],
).set(float(cooked_kpi['kpi_value']))
).set(cooked_kpi['kpi_value'])
LOGGER.debug("Metric pushed to the endpoints: {:}".format(PROM_METRICS[metric_name]))
except ValueError as e:
......
......@@ -29,4 +29,5 @@ def test_validate_kafka_topics():
def test_KafkaConsumer():
LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ")
KpiValueWriter.RunKafkaConsumer()
kpi_value_writer = KpiValueWriter()
kpi_value_writer.RunKafkaConsumer()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment