diff --git a/proto/analytics_frontend.proto b/proto/analytics_frontend.proto index 66661bc6635a641c882f4e00c8d134350ff74f4d..f42e30550f35c54cb586fa6910359196e55a9e89 100644 --- a/proto/analytics_frontend.proto +++ b/proto/analytics_frontend.proto @@ -17,11 +17,10 @@ package analytics_frontend; import "context.proto"; import "kpi_manager.proto"; -import "kpi_value_api.proto"; //import "kpi_sample_types.proto"; service AnalyticsFrontendService { - rpc StartAnalyzer (Analyzer ) returns (stream kpi_value_api.KpiValue) {} + rpc StartAnalyzer (Analyzer ) returns (stream AnalyzerAlarms) {} rpc StopAnalyzer (AnalyzerId ) returns (context.Empty) {} rpc SelectAnalyzers(AnalyzerFilter) returns (AnalyzerList ) {} } @@ -52,6 +51,13 @@ message Analyzer { uint64 batch_max_size = 11; // ..., max number of samples collected to execute the batch } +message AnalyzerAlarms { + context.Timestamp start_timestamp = 1; + context.Timestamp end_timestamp = 2; + kpi_manager.KpiId kpi_id = 3; + map<string, bool> alarms = 4; +} + message AnalyzerFilter { // Analyzer that fulfill the filter are those that match ALL the following fields. // An empty list means: any value is accepted. diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index d9495bc4eedb5e4f72f4bde27ab2653f34ccaf13..3dc59b6ce3e01044e829f264d6645eb3cc94d8be 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -21,11 +21,10 @@ from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Producer as KafkaProducer from confluent_kafka import KafkaError -from common.proto.kpi_value_api_pb2 import KpiValueType, KpiValue from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.proto.context_pb2 import Empty from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method -from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList +from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList, AnalyzerAlarms from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer from analytics.database.Analyzer_DB import AnalyzerDB from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel @@ -38,7 +37,7 @@ METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC') class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): def __init__(self): LOGGER.info('Init AnalyticsFrontendService') - self.listener_topic = KafkaTopic.ANALYTICS_RESPONSE.value + self.listener_topic = KafkaTopic.ALARMS.value self.db_obj = AnalyzerDB() self.result_queue = queue.Queue() self.scheduler = BackgroundScheduler() @@ -50,21 +49,26 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StartAnalyzer(self, request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore - ) -> KpiValue: # type: ignore + ) -> AnalyzerAlarms: # type: ignore LOGGER.info ("At Service gRPC message: {:}".format(request)) - response = KpiValue() + response = AnalyzerAlarms() self.db_obj.add_row_to_db( AnalyzerModel.ConvertAnalyzerToRow(request) ) self.PublishStartRequestOnKafka(request) - for key, value in self.StartResponseListener(request.analyzer_id.analyzer_id.uuid): - # LOGGER.debug("Response from ---> {:}, {:}".format(key, value)) + for alarm_key, value in self.StartResponseListener(request.analyzer_id.analyzer_id.uuid): + response.start_timestamp.timestamp = datetime.strptime( + value["window"]["start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() + response.end_timestamp.timestamp = datetime.strptime( + value["window"]["end"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() response.kpi_id.kpi_id.uuid = value['kpi_id'] - response.timestamp.timestamp = datetime.strptime(value['time_stamp'], "%Y-%m-%dT%H:%M:%SZ").timestamp() - response.kpi_value_type.CopyFrom(self.ConverValueToKpiValueType(value['kpi_value'])) + for key, threshold in value.items(): + if "THRESHOLD_" in key: + LOGGER.debug("-- {:} - {:}".format(key, threshold)) + response.alarms[key] = threshold + yield response - # response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid def PublishStartRequestOnKafka(self, analyzer_obj): """ @@ -89,8 +93,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ) LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate)) self.kafka_producer.flush() - - # self.StartResponseListener(analyzer_uuid) def StartResponseListener(self, filter_key=None): """ @@ -141,9 +143,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): LOGGER.info(f"Received key: {key}, value: {value}") self.result_queue.put((key, value)) else: - LOGGER.info(f"Skipping message with unmatched key: {key}") - # value = json.loads(msg.value().decode('utf-8')) # Added for debugging - # self.result_queue.put((filter_key, value)) # Added for debugging + LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}") except Exception as e: LOGGER.error(f"Error processing Kafka message: {e}") @@ -210,19 +210,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): except Exception as e: LOGGER.error('Unable to apply filter on table {:}. ERROR: {:}'.format(AnalyzerModel.__name__, e)) - def ConverValueToKpiValueType(self, value): - kpi_value_type = KpiValueType() - if isinstance(value, int): # Check for integer type - kpi_value_type.int32Val = value # Assuming int32Val for example - elif isinstance(value, float): # Check for float type - kpi_value_type.floatVal = value - elif isinstance(value, str): # Check for string type - kpi_value_type.stringVal = value - elif isinstance(value, bool): # Check for boolean type - kpi_value_type.boolVal = value - # Add other checks for different types as needed - return kpi_value_type - def delivery_callback(self, err, msg): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py index 4424bab103ecc28aa02b58fa12646fd26754765b..9f5c040f366b02a6fea27e8e8696c0c118ece05a 100644 --- a/src/analytics/frontend/tests/test_frontend.py +++ b/src/analytics/frontend/tests/test_frontend.py @@ -26,6 +26,7 @@ from common.Settings import ( get_service_port_grpc, get_env_var_name, from common.tools.kafka.Variables import KafkaTopic from common.proto.kpi_value_api_pb2 import KpiValue +from common.proto.analytics_frontend_pb2 import AnalyzerAlarms from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer,