diff --git a/proto/analytics_frontend.proto b/proto/analytics_frontend.proto index f42e30550f35c54cb586fa6910359196e55a9e89..bb6afb2e424b646fd8d9a6810c38a8a6863c230f 100644 --- a/proto/analytics_frontend.proto +++ b/proto/analytics_frontend.proto @@ -20,7 +20,7 @@ import "kpi_manager.proto"; //import "kpi_sample_types.proto"; service AnalyticsFrontendService { - rpc StartAnalyzer (Analyzer ) returns (stream AnalyzerAlarms) {} + rpc StartAnalyzer (Analyzer ) returns (AnalyzerId) {} rpc StopAnalyzer (AnalyzerId ) returns (context.Empty) {} rpc SelectAnalyzers(AnalyzerFilter) returns (AnalyzerList ) {} } @@ -51,13 +51,6 @@ message Analyzer { uint64 batch_max_size = 11; // ..., max number of samples collected to execute the batch } -message AnalyzerAlarms { - context.Timestamp start_timestamp = 1; - context.Timestamp end_timestamp = 2; - kpi_manager.KpiId kpi_id = 3; - map<string, bool> alarms = 4; -} - message AnalyzerFilter { // Analyzer that fulfill the filter are those that match ALL the following fields. // An empty list means: any value is accepted. diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index 3dc59b6ce3e01044e829f264d6645eb3cc94d8be..baa88a8b7de2a06b569cf61bb248b281ce5715c9 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -12,24 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging, grpc, json -import logging, grpc, json, queue - -from datetime import datetime from typing import Dict -from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Producer as KafkaProducer -from confluent_kafka import KafkaError from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.proto.context_pb2 import Empty from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method -from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList, AnalyzerAlarms +from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer from analytics.database.Analyzer_DB import AnalyzerDB from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel -from apscheduler.schedulers.background import BackgroundScheduler -from apscheduler.triggers.interval import IntervalTrigger + LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC') @@ -37,38 +32,22 @@ METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC') class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): def __init__(self): LOGGER.info('Init AnalyticsFrontendService') - self.listener_topic = KafkaTopic.ALARMS.value self.db_obj = AnalyzerDB() - self.result_queue = queue.Queue() - self.scheduler = BackgroundScheduler() self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) - self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : 'analytics-frontend', - 'auto.offset.reset' : 'latest'}) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StartAnalyzer(self, request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore ) -> AnalyzerAlarms: # type: ignore LOGGER.info ("At Service gRPC message: {:}".format(request)) - response = AnalyzerAlarms() + response = AnalyzerId() self.db_obj.add_row_to_db( AnalyzerModel.ConvertAnalyzerToRow(request) ) self.PublishStartRequestOnKafka(request) - for alarm_key, value in self.StartResponseListener(request.analyzer_id.analyzer_id.uuid): - response.start_timestamp.timestamp = datetime.strptime( - value["window"]["start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() - response.end_timestamp.timestamp = datetime.strptime( - value["window"]["end"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() - response.kpi_id.kpi_id.uuid = value['kpi_id'] - for key, threshold in value.items(): - if "THRESHOLD_" in key: - LOGGER.debug("-- {:} - {:}".format(key, threshold)) - response.alarms[key] = threshold - - yield response + response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid + return response def PublishStartRequestOnKafka(self, analyzer_obj): """ @@ -94,59 +73,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate)) self.kafka_producer.flush() - def StartResponseListener(self, filter_key=None): - """ - Start the Kafka response listener with APScheduler and return key-value pairs periodically. - """ - LOGGER.info("Starting StartResponseListener") - # Schedule the ResponseListener at fixed intervals - self.scheduler.add_job( - self.response_listener, - trigger=IntervalTrigger(seconds=5), - args=[filter_key], - id=f"response_listener_{self.listener_topic}", - replace_existing=True - ) - self.scheduler.start() - LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...") - try: - while True: - LOGGER.info("entering while...") - key, value = self.result_queue.get() # Wait until a result is available - LOGGER.info("In while true ...") - yield key, value # Yield the result to the calling function - except KeyboardInterrupt: - LOGGER.warning("Listener stopped manually.") - finally: - self.StopListener() - - def response_listener(self, filter_key=None): - """ - Poll Kafka messages and put key-value pairs into the queue. - """ - LOGGER.info(f"Polling Kafka topic {self.listener_topic}...") - - consumer = self.kafka_consumer - consumer.subscribe([self.listener_topic]) - msg = consumer.poll(2.0) - if msg is None: - return - elif msg.error(): - if msg.error().code() != KafkaError._PARTITION_EOF: - LOGGER.error(f"Kafka error: {msg.error()}") - return - - try: - key = msg.key().decode('utf-8') if msg.key() else None - if filter_key is not None and key == filter_key: - value = json.loads(msg.value().decode('utf-8')) - LOGGER.info(f"Received key: {key}, value: {value}") - self.result_queue.put((key, value)) - else: - LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}") - except Exception as e: - LOGGER.error(f"Error processing Kafka message: {e}") - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StopAnalyzer(self, request : AnalyzerId, grpc_context: grpc.ServicerContext # type: ignore @@ -181,15 +107,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ) LOGGER.info("Analyzer Stop Request Generated: Analyzer Id: {:}".format(analyzer_uuid)) self.kafka_producer.flush() - self.StopListener() - - def StopListener(self): - """ - Gracefully stop the Kafka listener and the scheduler. - """ - LOGGER.info("Stopping Kafka listener...") - self.scheduler.shutdown() - LOGGER.info("Kafka listener stopped.") @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectAnalyzers(self,