diff --git a/proto/kpi_value_api.proto b/proto/kpi_value_api.proto index dff96272e3d05756dd19a49ecaede7311b196540..8b5886a69667a58cb73f4a4df0e84060ab3710a5 100644 --- a/proto/kpi_value_api.proto +++ b/proto/kpi_value_api.proto @@ -19,8 +19,9 @@ import "context.proto"; import "kpi_manager.proto"; service KpiValueAPIService { - rpc StoreKpiValues (KpiValueList) returns (context.Empty) {} - rpc SelectKpiValues (KpiValueFilter) returns (KpiValueList) {} + rpc StoreKpiValues (KpiValueList) returns (context.Empty) {} + rpc SelectKpiValues (KpiValueFilter) returns (KpiValueList) {} + rpc GetKpiAlarms (kpi_manager.KpiId) returns (stream KpiAlarms) {} } message KpiValue { @@ -50,3 +51,10 @@ message KpiValueFilter { repeated context.Timestamp start_timestamp = 2; repeated context.Timestamp end_timestamp = 3; } + +message KpiAlarms { + context.Timestamp start_timestamp = 1; + context.Timestamp end_timestamp = 2; + kpi_manager.KpiId kpi_id = 3; + map<string, bool> alarms = 4; +} \ No newline at end of file diff --git a/src/kpi_value_api/client/KpiValueApiClient.py b/src/kpi_value_api/client/KpiValueApiClient.py index f432271cfb7c8136f72156330b25d0b82b934d99..dfc5f07254a30db34a20ee8d0eae931cfd0ce571 100644 --- a/src/kpi_value_api/client/KpiValueApiClient.py +++ b/src/kpi_value_api/client/KpiValueApiClient.py @@ -15,17 +15,18 @@ import grpc, logging from common.Constants import ServiceNameEnum -from common.Settings import get_service_host, get_service_port_grpc -from common.tools.client.RetryDecorator import retry, delay_exponential -from common.tools.grpc.Tools import grpc_message_to_json_string +from common.Settings import get_service_host, get_service_port_grpc -from common.proto.context_pb2 import Empty -from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter +from common.tools.client.RetryDecorator import retry, delay_exponential +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.proto.context_pb2 import Empty +from common.proto.kpi_manager_pb2 import KpiId +from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiAlarms from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceStub -LOGGER = logging.getLogger(__name__) -MAX_RETRIES = 10 -DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) +LOGGER = logging.getLogger(__name__) +MAX_RETRIES = 10 +DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') class KpiValueApiClient: @@ -34,8 +35,8 @@ class KpiValueApiClient: if not port: port = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI) self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) - self.channel = None - self.stub = None + self.channel = None + self.stub = None self.connect() LOGGER.debug('Channel created') @@ -46,18 +47,25 @@ class KpiValueApiClient: def close(self): if self.channel is not None: self.channel.close() self.channel = None - self.stub = None + self.stub = None @RETRY_DECORATOR - def StoreKpiValues(self, request: KpiValueList) -> Empty: + def StoreKpiValues(self, request: KpiValueList) -> Empty: # type: ignore LOGGER.debug('StoreKpiValues: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.StoreKpiValues(request) LOGGER.debug('StoreKpiValues result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def SelectKpiValues(self, request: KpiValueFilter) -> KpiValueList: + def SelectKpiValues(self, request: KpiValueFilter) -> KpiValueList: # type: ignore LOGGER.debug('SelectKpiValues: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SelectKpiValues(request) LOGGER.debug('SelectKpiValues result: {:s}'.format(grpc_message_to_json_string(response))) return response + + @RETRY_DECORATOR + def GetKpiAlarms(self, request: KpiId) -> KpiAlarms: # type: ignore + LOGGER.debug('GetKpiAlarms: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetKpiAlarms(request) + LOGGER.debug('GetKpiAlarms result: {:s}'.format(grpc_message_to_json_string(response))) + return response diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index 4ea978fafc8d7454d41f64182d553d030215113a..0f57f88219a74108a555cf87e9bdb98999fd5da2 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -12,18 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, grpc, json +from datetime import datetime +import logging, grpc, json, queue from typing import Dict from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.tools.kafka.Variables import KafkaConfig, KafkaTopic +from confluent_kafka import KafkaError from common.proto.context_pb2 import Empty from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceServicer -from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiValue, KpiValueType - +from common.proto.kpi_value_api_pb2 import KpiAlarms, KpiValueList, KpiValueFilter, KpiValue, KpiValueType +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.interval import IntervalTrigger from confluent_kafka import Producer as KafkaProducer +from confluent_kafka import Consumer as KafkaConsumer from prometheus_api_client import PrometheusConnect from prometheus_api_client.utils import parse_datetime @@ -37,8 +41,14 @@ PROM_URL = "http://prometheus-k8s.monitoring.svc.cluster.local:9090" # TO class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): def __init__(self): LOGGER.debug('Init KpiValueApiService') + self.listener_topic = KafkaTopic.ALARMS.value + self.result_queue = queue.Queue() + self.scheduler = BackgroundScheduler() self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) - + self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : 'analytics-frontend', + 'auto.offset.reset' : 'latest'}) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext ) -> Empty: @@ -109,17 +119,14 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): kpi_value = KpiValue() kpi_value.kpi_id.kpi_id = record['metric']['__name__'], kpi_value.timestamp = value[0], - kpi_value.kpi_value_type = self.ConverValueToKpiValueType(value[1]) + kpi_value.kpi_value_type.CopyFrom(self.ConverValueToKpiValueType(value['kpi_value'])) response.kpi_value_list.append(kpi_value) return response def GetKpiSampleType(self, kpi_value: str, kpi_manager_client): - print("--- START -----") - kpi_id = KpiId() kpi_id.kpi_id.uuid = kpi_value.kpi_id.kpi_id.uuid # print("KpiId generated: {:}".format(kpi_id)) - try: kpi_descriptor_object = KpiDescriptor() kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) @@ -135,26 +142,91 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e)) print ("Unable to get KpiDescriptor. Error: {:}".format(e)) - def ConverValueToKpiValueType(self, value): - # Check if the value is an integer (int64) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def GetKpiAlarms(self, request: KpiId, grpc_context: grpc.ServicerContext) -> KpiAlarms: # type: ignore + """ + Get Alarms from Kafka return Alrams periodically. + """ + LOGGER.debug('GetKpiAlarms: {:}'.format(request)) + response = KpiAlarms() + + for alarm_key, value in self.StartResponseListener(request.kpi_id.uuid): + response.start_timestamp.timestamp = datetime.strptime( + value["window"]["start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() + response.end_timestamp.timestamp = datetime.strptime( + value["window"]["end"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() + response.kpi_id.kpi_id.uuid = value['kpi_id'] + for key, threshold in value.items(): + if "THRESHOLD_" in key: + response.alarms[key] = threshold + + yield response + + def StartResponseListener(self, filter_key=None): + """ + Start the Kafka response listener with APScheduler and return key-value pairs periodically. + """ + LOGGER.info("Starting StartResponseListener") + # Schedule the ResponseListener at fixed intervals + self.scheduler.add_job( + self.response_listener, + trigger=IntervalTrigger(seconds=5), + args=[filter_key], + id=f"response_listener_{self.listener_topic}", + replace_existing=True + ) + self.scheduler.start() + LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...") try: - int_value = int(value) - return KpiValueType(int64Val=int_value) - except (ValueError, TypeError): - pass - # Check if the value is a float + while True: + LOGGER.info("entering while...") + key, value = self.result_queue.get() # Wait until a result is available + LOGGER.info("In while true ...") + yield key, value # Yield the result to the calling function + except KeyboardInterrupt: + LOGGER.warning("Listener stopped manually.") + finally: + self.StopListener() + + def response_listener(self, filter_key=None): + """ + Poll Kafka messages and put key-value pairs into the queue. + """ + LOGGER.info(f"Polling Kafka topic {self.listener_topic}...") + + consumer = self.kafka_consumer + consumer.subscribe([self.listener_topic]) + msg = consumer.poll(2.0) + if msg is None: + return + elif msg.error(): + if msg.error().code() != KafkaError._PARTITION_EOF: + LOGGER.error(f"Kafka error: {msg.error()}") + return try: - float_value = float(value) - return KpiValueType(floatVal=float_value) - except (ValueError, TypeError): - pass - # Check if the value is a boolean - if value.lower() in ['true', 'false']: - bool_value = value.lower() == 'true' - return KpiValueType(boolVal=bool_value) - # If none of the above, treat it as a string - return KpiValueType(stringVal=value) + key = msg.key().decode('utf-8') if msg.key() else None + if filter_key is not None and key == filter_key: + value = json.loads(msg.value().decode('utf-8')) + LOGGER.info(f"Received key: {key}, value: {value}") + self.result_queue.put((key, value)) + else: + LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}") + except Exception as e: + LOGGER.error(f"Error processing Kafka message: {e}") def delivery_callback(self, err, msg): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) else: LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) + + def ConverValueToKpiValueType(self, value): + kpi_value_type = KpiValueType() + if isinstance(value, int): + kpi_value_type.int32Val = value + elif isinstance(value, float): + kpi_value_type.floatVal = value + elif isinstance(value, str): + kpi_value_type.stringVal = value + elif isinstance(value, bool): + kpi_value_type.boolVal = value + # Add other checks for different types as needed + return kpi_value_type diff --git a/src/kpi_value_api/tests/messages.py b/src/kpi_value_api/tests/messages.py index d8ad14bd44eebc1e9412cfd5ff2973e6018c95e9..50240c0154deff33dfdbb797cd5e0fca9a05c8ab 100644 --- a/src/kpi_value_api/tests/messages.py +++ b/src/kpi_value_api/tests/messages.py @@ -13,9 +13,16 @@ # limitations under the License. import uuid, time +from common.proto import kpi_manager_pb2 from common.proto.kpi_value_api_pb2 import KpiValue, KpiValueList +def create_kpi_id_request(): + _create_kpi_id = kpi_manager_pb2.KpiId() + _create_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + # _create_kpi_id.kpi_id.uuid = str(uuid.uuid4()) + return _create_kpi_id + def create_kpi_value_list(): _create_kpi_value_list = KpiValueList() # To run this experiment sucessfully, add an existing UUID of a KPI Descriptor from the KPI DB.