Skip to content
Snippets Groups Projects
Commit 1bb6e1bb authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes in KPI Manager KPI value API and value Writer.

- updated cmd in test file of KPI manager
- move kafka producer object to __init__ function.
- write JSON object to Kafka
-  Read JSON object from Kafka
- Slight to manage the affect of JSON object.
- Static methods are removed.
parent 9119dd59
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!258Resolve "Re-designing of Telemetry"
...@@ -24,7 +24,7 @@ cd $PROJECTDIR/src ...@@ -24,7 +24,7 @@ cd $PROJECTDIR/src
# python3 kpi_manager/tests/test_unitary.py # python3 kpi_manager/tests/test_unitary.py
RCFILE=$PROJECTDIR/coverage/.coveragerc RCFILE=$PROJECTDIR/coverage/.coveragerc
CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}') CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace ${CRDB_NAMESPACE} -o 'jsonpath={.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
kpi_manager/tests/test_kpi_manager.py kpi_manager/tests/test_kpi_manager.py
...@@ -55,7 +55,7 @@ class KafkaTopic(Enum): ...@@ -55,7 +55,7 @@ class KafkaTopic(Enum):
all_topics = [member.value for member in KafkaTopic] all_topics = [member.value for member in KafkaTopic]
LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.get_kafka_address())) LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.get_kafka_address()))
if( KafkaTopic.create_new_topic_if_not_exists( all_topics )): if( KafkaTopic.create_new_topic_if_not_exists( all_topics )):
LOGGER.debug("All topics are created sucsessfully") LOGGER.debug("All topics are created sucsessfully or Already Exists")
return True return True
else: else:
LOGGER.debug("Error creating all topics") LOGGER.debug("Error creating all topics")
......
...@@ -12,8 +12,8 @@ ...@@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging, grpc import logging, grpc, json
from typing import Tuple, Any from typing import Dict
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
...@@ -37,32 +37,42 @@ PROM_URL = "http://prometheus-k8s.monitoring.svc.cluster.local:9090" # TO ...@@ -37,32 +37,42 @@ PROM_URL = "http://prometheus-k8s.monitoring.svc.cluster.local:9090" # TO
class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
def __init__(self): def __init__(self):
LOGGER.debug('Init KpiValueApiService') LOGGER.debug('Init KpiValueApiService')
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext
) -> Empty: ) -> Empty:
LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request)) LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request))
producer_obj = KafkaProducer({
'bootstrap.servers' : KafkaConfig.get_kafka_address() producer = self.kafka_producer
})
for kpi_value in request.kpi_value_list: for kpi_value in request.kpi_value_list:
kpi_value_to_produce : Tuple [str, Any, Any] = ( kpi_value_to_produce : Dict = {
kpi_value.kpi_id.kpi_id, "kpi_uuid" : kpi_value.kpi_id.kpi_id.uuid,
kpi_value.timestamp, "timestamp" : kpi_value.timestamp.timestamp,
kpi_value.kpi_value_type # kpi_value.kpi_value_type.(many options) how? "kpi_value_type" : self.ExtractKpiValueByType(kpi_value.kpi_value_type)
) }
LOGGER.debug('KPI to produce is {:}'.format(kpi_value_to_produce)) LOGGER.debug('KPI to produce is {:}'.format(kpi_value_to_produce))
msg_key = "gRPC-kpivalueapi" # str(__class__.__name__) can be used msg_key = "gRPC-kpivalueapi" # str(__class__.__name__) can be used
producer_obj.produce( producer.produce(
KafkaTopic.VALUE.value, KafkaTopic.VALUE.value,
key = msg_key, key = msg_key,
value = kpi_value.SerializeToString(), # value = json.dumps(kpi_value_to_produce), value = json.dumps(kpi_value_to_produce),
callback = self.delivery_callback callback = self.delivery_callback
) )
producer_obj.flush() producer.flush()
return Empty() return Empty()
def ExtractKpiValueByType(self, value):
attributes = [ 'floatVal' , 'int32Val' , 'uint32Val','int64Val',
'uint64Val', 'stringVal', 'boolVal']
for attr in attributes:
try:
return getattr(value, attr)
except (ValueError, TypeError, AttributeError):
continue
return None
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SelectKpiValues(self, request: KpiValueFilter, grpc_context: grpc.ServicerContext def SelectKpiValues(self, request: KpiValueFilter, grpc_context: grpc.ServicerContext
) -> KpiValueList: ) -> KpiValueList:
...@@ -130,13 +140,13 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): ...@@ -130,13 +140,13 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
try: try:
int_value = int(value) int_value = int(value)
return KpiValueType(int64Val=int_value) return KpiValueType(int64Val=int_value)
except ValueError: except (ValueError, TypeError):
pass pass
# Check if the value is a float # Check if the value is a float
try: try:
float_value = float(value) float_value = float(value)
return KpiValueType(floatVal=float_value) return KpiValueType(floatVal=float_value)
except ValueError: except (ValueError, TypeError):
pass pass
# Check if the value is a boolean # Check if the value is a boolean
if value.lower() in ['true', 'false']: if value.lower() in ['true', 'false']:
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
import logging import logging
import threading import threading
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
...@@ -38,28 +39,25 @@ class KpiValueWriter(GenericGrpcService): ...@@ -38,28 +39,25 @@ class KpiValueWriter(GenericGrpcService):
def __init__(self, cls_name : str = __name__) -> None: def __init__(self, cls_name : str = __name__) -> None:
port = get_service_port_grpc(ServiceNameEnum.KPIVALUEWRITER) port = get_service_port_grpc(ServiceNameEnum.KPIVALUEWRITER)
super().__init__(port, cls_name=cls_name) super().__init__(port, cls_name=cls_name)
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'KpiValueWriter',
'auto.offset.reset' : 'latest'})
@staticmethod def RunKafkaConsumer(self):
def RunKafkaConsumer(): thread = threading.Thread(target=self.KafkaKpiConsumer, args=())
thread = threading.Thread(target=KpiValueWriter.KafkaKpiConsumer, args=())
ACTIVE_CONSUMERS.append(thread) ACTIVE_CONSUMERS.append(thread)
thread.start() thread.start()
@staticmethod def KafkaKpiConsumer(self):
def KafkaKpiConsumer():
kpi_manager_client = KpiManagerClient() kpi_manager_client = KpiManagerClient()
metric_writer = MetricWriterToPrometheus() metric_writer = MetricWriterToPrometheus()
kafka_consumer = KafkaConsumer( consumer = self.kafka_consumer
{ 'bootstrap.servers' : KafkaConfig.get_kafka_address(), consumer.subscribe([KafkaTopic.VALUE.value])
'group.id' : 'KpiValueWriter',
'auto.offset.reset' : 'latest'}
)
kafka_consumer.subscribe([KafkaTopic.VALUE.value])
LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
while True: while True:
raw_kpi = kafka_consumer.poll(1.0) raw_kpi = consumer.poll(1.0)
if raw_kpi is None: if raw_kpi is None:
continue continue
elif raw_kpi.error(): elif raw_kpi.error():
...@@ -69,24 +67,21 @@ class KpiValueWriter(GenericGrpcService): ...@@ -69,24 +67,21 @@ class KpiValueWriter(GenericGrpcService):
print("Consumer error: {}".format(raw_kpi.error())) print("Consumer error: {}".format(raw_kpi.error()))
continue continue
try: try:
kpi_value = KpiValue() kpi_value = json.loads(raw_kpi.value().decode('utf-8'))
kpi_value.ParseFromString(raw_kpi.value())
LOGGER.info("Received KPI : {:}".format(kpi_value)) LOGGER.info("Received KPI : {:}".format(kpi_value))
print("Received KPI : {:}".format(kpi_value)) print("Received KPI : {:}".format(kpi_value))
KpiValueWriter.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer) self.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer)
except Exception as e: except Exception as e:
print("Error detail: {:}".format(e)) print("Error detail: {:}".format(e))
continue continue
@staticmethod def get_kpi_descriptor(self, kpi_value: str, kpi_manager_client, metric_writer):
def get_kpi_descriptor(kpi_value: str, kpi_manager_client, metric_writer):
print("--- START -----") print("--- START -----")
kpi_id = KpiId() kpi_id = KpiId()
kpi_id.kpi_id.uuid = kpi_value.kpi_id.kpi_id.uuid kpi_id.kpi_id.uuid = kpi_value['kpi_uuid']
print("KpiId generated: {:}".format(kpi_id)) print("KpiId generated: {:}".format(kpi_id))
# print("Kpi manger client created: {:}".format(kpi_manager_client)) # print("Kpi manger client created: {:}".format(kpi_manager_client))
try: try:
kpi_descriptor_object = KpiDescriptor() kpi_descriptor_object = KpiDescriptor()
kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id)
......
...@@ -14,10 +14,8 @@ ...@@ -14,10 +14,8 @@
# read Kafka stream from Kafka topic # read Kafka stream from Kafka topic
# import ast
# import time
# import threading
import logging import logging
from typing import Dict
from prometheus_client import Gauge from prometheus_client import Gauge
from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.kpi_sample_types_pb2 import KpiSampleType
...@@ -47,13 +45,13 @@ class MetricWriterToPrometheus: ...@@ -47,13 +45,13 @@ class MetricWriterToPrometheus:
'slice_id' : kpi_descriptor.slice_id.slice_uuid.uuid, 'slice_id' : kpi_descriptor.slice_id.slice_uuid.uuid,
'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid, 'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid,
'link_id' : kpi_descriptor.link_id.link_uuid.uuid, 'link_id' : kpi_descriptor.link_id.link_uuid.uuid,
'time_stamp' : kpi_value.timestamp.timestamp, 'time_stamp' : kpi_value['timestamp'],
'kpi_value' : kpi_value.kpi_value_type.floatVal 'kpi_value' : kpi_value['kpi_value_type']
} }
# LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi)) # LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi))
return cooked_kpi return cooked_kpi
def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue): def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: Dict):
# merge both gRPC messages into single varible. # merge both gRPC messages into single varible.
cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value) cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value)
tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'}
...@@ -76,7 +74,7 @@ class MetricWriterToPrometheus: ...@@ -76,7 +74,7 @@ class MetricWriterToPrometheus:
connection_id = cooked_kpi['connection_id'], connection_id = cooked_kpi['connection_id'],
link_id = cooked_kpi['link_id'], link_id = cooked_kpi['link_id'],
time_stamp = cooked_kpi['time_stamp'], time_stamp = cooked_kpi['time_stamp'],
).set(float(cooked_kpi['kpi_value'])) ).set(cooked_kpi['kpi_value'])
LOGGER.debug("Metric pushed to the endpoints: {:}".format(PROM_METRICS[metric_name])) LOGGER.debug("Metric pushed to the endpoints: {:}".format(PROM_METRICS[metric_name]))
except ValueError as e: except ValueError as e:
......
...@@ -29,4 +29,5 @@ def test_validate_kafka_topics(): ...@@ -29,4 +29,5 @@ def test_validate_kafka_topics():
def test_KafkaConsumer(): def test_KafkaConsumer():
LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ") LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ")
KpiValueWriter.RunKafkaConsumer() kpi_value_writer = KpiValueWriter()
kpi_value_writer.RunKafkaConsumer()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment