Skip to content
Snippets Groups Projects
Commit 143f8742 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes to KpiValueAPI for Processing Alarms:

- Added the `GetKpiAlarms` method and `KpiAlarm` message in the proto file.
- Updated the client to handle the `GetKpiAlarms` method call.
- Implemented the `GetKpiAlarms` method (ResponseListener).
- Updated `ConvertValueToKpiValueType` to properly convert the value to the gRPC `KpiValueType` message.
- Updated test and messages files.
parent 798c1f47
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!266Resolve: "Unable to correctly extract the aggregation function names from the dictionary received as parameters in the Analyzer message"
......@@ -19,8 +19,9 @@ import "context.proto";
import "kpi_manager.proto";
service KpiValueAPIService {
rpc StoreKpiValues (KpiValueList) returns (context.Empty) {}
rpc SelectKpiValues (KpiValueFilter) returns (KpiValueList) {}
rpc StoreKpiValues (KpiValueList) returns (context.Empty) {}
rpc SelectKpiValues (KpiValueFilter) returns (KpiValueList) {}
rpc GetKpiAlarms (kpi_manager.KpiId) returns (stream KpiAlarms) {}
}
message KpiValue {
......@@ -50,3 +51,10 @@ message KpiValueFilter {
repeated context.Timestamp start_timestamp = 2;
repeated context.Timestamp end_timestamp = 3;
}
message KpiAlarms {
context.Timestamp start_timestamp = 1;
context.Timestamp end_timestamp = 2;
kpi_manager.KpiId kpi_id = 3;
map<string, bool> alarms = 4;
}
\ No newline at end of file
......@@ -15,17 +15,18 @@
import grpc, logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.Settings import get_service_host, get_service_port_grpc
from common.proto.context_pb2 import Empty
from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.proto.context_pb2 import Empty
from common.proto.kpi_manager_pb2 import KpiId
from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiAlarms
from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceStub
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 10
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 10
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class KpiValueApiClient:
......@@ -34,8 +35,8 @@ class KpiValueApiClient:
if not port: port = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint)))
self.channel = None
self.stub = None
self.channel = None
self.stub = None
self.connect()
LOGGER.debug('Channel created')
......@@ -46,18 +47,25 @@ class KpiValueApiClient:
def close(self):
if self.channel is not None: self.channel.close()
self.channel = None
self.stub = None
self.stub = None
@RETRY_DECORATOR
def StoreKpiValues(self, request: KpiValueList) -> Empty:
def StoreKpiValues(self, request: KpiValueList) -> Empty: # type: ignore
LOGGER.debug('StoreKpiValues: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.StoreKpiValues(request)
LOGGER.debug('StoreKpiValues result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def SelectKpiValues(self, request: KpiValueFilter) -> KpiValueList:
def SelectKpiValues(self, request: KpiValueFilter) -> KpiValueList: # type: ignore
LOGGER.debug('SelectKpiValues: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.SelectKpiValues(request)
LOGGER.debug('SelectKpiValues result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def GetKpiAlarms(self, request: KpiId) -> KpiAlarms: # type: ignore
LOGGER.debug('GetKpiAlarms: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.GetKpiAlarms(request)
LOGGER.debug('GetKpiAlarms result: {:s}'.format(grpc_message_to_json_string(response)))
return response
......@@ -12,18 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, grpc, json
from datetime import datetime
import logging, grpc, json, queue
from typing import Dict
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import KafkaError
from common.proto.context_pb2 import Empty
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId
from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceServicer
from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiValue, KpiValueType
from common.proto.kpi_value_api_pb2 import KpiAlarms, KpiValueList, KpiValueFilter, KpiValue, KpiValueType
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
from prometheus_api_client import PrometheusConnect
from prometheus_api_client.utils import parse_datetime
......@@ -37,8 +41,14 @@ PROM_URL = "http://prometheus-k8s.monitoring.svc.cluster.local:9090" # TO
class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
def __init__(self):
LOGGER.debug('Init KpiValueApiService')
self.listener_topic = KafkaTopic.ALARMS.value
self.result_queue = queue.Queue()
self.scheduler = BackgroundScheduler()
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'})
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext
) -> Empty:
......@@ -109,17 +119,14 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
kpi_value = KpiValue()
kpi_value.kpi_id.kpi_id = record['metric']['__name__'],
kpi_value.timestamp = value[0],
kpi_value.kpi_value_type = self.ConverValueToKpiValueType(value[1])
kpi_value.kpi_value_type.CopyFrom(self.ConverValueToKpiValueType(value['kpi_value']))
response.kpi_value_list.append(kpi_value)
return response
def GetKpiSampleType(self, kpi_value: str, kpi_manager_client):
print("--- START -----")
kpi_id = KpiId()
kpi_id.kpi_id.uuid = kpi_value.kpi_id.kpi_id.uuid
# print("KpiId generated: {:}".format(kpi_id))
try:
kpi_descriptor_object = KpiDescriptor()
kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id)
......@@ -135,26 +142,91 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e))
print ("Unable to get KpiDescriptor. Error: {:}".format(e))
def ConverValueToKpiValueType(self, value):
# Check if the value is an integer (int64)
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def GetKpiAlarms(self, request: KpiId, grpc_context: grpc.ServicerContext) -> KpiAlarms: # type: ignore
"""
Get Alarms from Kafka return Alrams periodically.
"""
LOGGER.debug('GetKpiAlarms: {:}'.format(request))
response = KpiAlarms()
for alarm_key, value in self.StartResponseListener(request.kpi_id.uuid):
response.start_timestamp.timestamp = datetime.strptime(
value["window"]["start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
response.end_timestamp.timestamp = datetime.strptime(
value["window"]["end"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
response.kpi_id.kpi_id.uuid = value['kpi_id']
for key, threshold in value.items():
if "THRESHOLD_" in key:
response.alarms[key] = threshold
yield response
def StartResponseListener(self, filter_key=None):
"""
Start the Kafka response listener with APScheduler and return key-value pairs periodically.
"""
LOGGER.info("Starting StartResponseListener")
# Schedule the ResponseListener at fixed intervals
self.scheduler.add_job(
self.response_listener,
trigger=IntervalTrigger(seconds=5),
args=[filter_key],
id=f"response_listener_{self.listener_topic}",
replace_existing=True
)
self.scheduler.start()
LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...")
try:
int_value = int(value)
return KpiValueType(int64Val=int_value)
except (ValueError, TypeError):
pass
# Check if the value is a float
while True:
LOGGER.info("entering while...")
key, value = self.result_queue.get() # Wait until a result is available
LOGGER.info("In while true ...")
yield key, value # Yield the result to the calling function
except KeyboardInterrupt:
LOGGER.warning("Listener stopped manually.")
finally:
self.StopListener()
def response_listener(self, filter_key=None):
"""
Poll Kafka messages and put key-value pairs into the queue.
"""
LOGGER.info(f"Polling Kafka topic {self.listener_topic}...")
consumer = self.kafka_consumer
consumer.subscribe([self.listener_topic])
msg = consumer.poll(2.0)
if msg is None:
return
elif msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
LOGGER.error(f"Kafka error: {msg.error()}")
return
try:
float_value = float(value)
return KpiValueType(floatVal=float_value)
except (ValueError, TypeError):
pass
# Check if the value is a boolean
if value.lower() in ['true', 'false']:
bool_value = value.lower() == 'true'
return KpiValueType(boolVal=bool_value)
# If none of the above, treat it as a string
return KpiValueType(stringVal=value)
key = msg.key().decode('utf-8') if msg.key() else None
if filter_key is not None and key == filter_key:
value = json.loads(msg.value().decode('utf-8'))
LOGGER.info(f"Received key: {key}, value: {value}")
self.result_queue.put((key, value))
else:
LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}")
except Exception as e:
LOGGER.error(f"Error processing Kafka message: {e}")
def delivery_callback(self, err, msg):
if err: LOGGER.debug('Message delivery failed: {:}'.format(err))
else: LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
def ConverValueToKpiValueType(self, value):
kpi_value_type = KpiValueType()
if isinstance(value, int):
kpi_value_type.int32Val = value
elif isinstance(value, float):
kpi_value_type.floatVal = value
elif isinstance(value, str):
kpi_value_type.stringVal = value
elif isinstance(value, bool):
kpi_value_type.boolVal = value
# Add other checks for different types as needed
return kpi_value_type
......@@ -13,9 +13,16 @@
# limitations under the License.
import uuid, time
from common.proto import kpi_manager_pb2
from common.proto.kpi_value_api_pb2 import KpiValue, KpiValueList
def create_kpi_id_request():
_create_kpi_id = kpi_manager_pb2.KpiId()
_create_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
# _create_kpi_id.kpi_id.uuid = str(uuid.uuid4())
return _create_kpi_id
def create_kpi_value_list():
_create_kpi_value_list = KpiValueList()
# To run this experiment sucessfully, add an existing UUID of a KPI Descriptor from the KPI DB.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment