Commit ad695f99 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes to Analytics Frontend:

- Removed `AnalyzerAlarms` from the proto file.
- Moved Alarms functionality from Analytics Frontend to KpiValueAPI.
parent 617b5666
Loading
Loading
Loading
Loading
+1 −8
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ import "kpi_manager.proto";
//import "kpi_sample_types.proto";

service AnalyticsFrontendService {
  rpc StartAnalyzer  (Analyzer      ) returns (stream AnalyzerAlarms) {}
  rpc StartAnalyzer  (Analyzer      ) returns (AnalyzerId)    {}
  rpc StopAnalyzer   (AnalyzerId    ) returns (context.Empty) {}
  rpc SelectAnalyzers(AnalyzerFilter) returns (AnalyzerList ) {}
}
@@ -51,13 +51,6 @@ message Analyzer {
  uint64                     batch_max_size       = 11; // ..., max number of samples collected to execute the batch
}

message AnalyzerAlarms {
  context.Timestamp start_timestamp = 1;
  context.Timestamp end_timestamp   = 2;
  kpi_manager.KpiId kpi_id          = 3;
  map<string, bool> alarms          = 4;
}

message AnalyzerFilter {
  // Analyzer that fulfill the filter are those that match ALL the following fields.
  // An empty list means: any value is accepted.
+6 −89
Original line number Diff line number Diff line
@@ -12,24 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, grpc, json

import logging, grpc, json, queue

from datetime        import datetime
from typing          import Dict
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import KafkaError

from common.tools.kafka.Variables             import KafkaConfig, KafkaTopic
from common.proto.context_pb2                 import Empty
from common.method_wrappers.Decorator         import MetricsPool, safe_and_metered_rpc_method
from common.proto.analytics_frontend_pb2      import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList, AnalyzerAlarms
from common.proto.analytics_frontend_pb2      import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList
from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer
from analytics.database.Analyzer_DB           import AnalyzerDB
from analytics.database.AnalyzerModel         import Analyzer as AnalyzerModel
from apscheduler.schedulers.background        import BackgroundScheduler
from apscheduler.triggers.interval            import IntervalTrigger


LOGGER           = logging.getLogger(__name__)
METRICS_POOL     = MetricsPool('AnalyticsFrontend', 'NBIgRPC')
@@ -37,38 +32,22 @@ METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC')
class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
    def __init__(self):
        LOGGER.info('Init AnalyticsFrontendService')
        self.listener_topic = KafkaTopic.ALARMS.value
        self.db_obj         = AnalyzerDB()
        self.result_queue   = queue.Queue()
        self.scheduler      = BackgroundScheduler()
        self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
        self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
                                            'group.id'           : 'analytics-frontend',
                                            'auto.offset.reset'  : 'latest'})

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def StartAnalyzer(self, 
                       request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore
                      ) -> AnalyzerAlarms: # type: ignore
        LOGGER.info ("At Service gRPC message: {:}".format(request))
        response = AnalyzerAlarms()
        response = AnalyzerId()

        self.db_obj.add_row_to_db(
            AnalyzerModel.ConvertAnalyzerToRow(request)
        )
        self.PublishStartRequestOnKafka(request)
        for alarm_key, value in self.StartResponseListener(request.analyzer_id.analyzer_id.uuid):
            response.start_timestamp.timestamp = datetime.strptime(
                value["window"]["start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
            response.end_timestamp.timestamp = datetime.strptime(
                value["window"]["end"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
            response.kpi_id.kpi_id.uuid  = value['kpi_id']
            for key, threshold in value.items():
                if "THRESHOLD_" in key:
                    LOGGER.debug("-- {:} - {:}".format(key, threshold))
                    response.alarms[key] = threshold

            yield response
        response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid
        return response

    def PublishStartRequestOnKafka(self, analyzer_obj):
        """
@@ -94,59 +73,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
        LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate))
        self.kafka_producer.flush()

    def StartResponseListener(self, filter_key=None):
        """
        Start the Kafka response listener with APScheduler and return key-value pairs periodically.
        """
        LOGGER.info("Starting StartResponseListener")
        # Schedule the ResponseListener at fixed intervals
        self.scheduler.add_job(
            self.response_listener,
            trigger=IntervalTrigger(seconds=5),
            args=[filter_key], 
            id=f"response_listener_{self.listener_topic}",
            replace_existing=True
        )
        self.scheduler.start()
        LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...")
        try:
            while True:
                LOGGER.info("entering while...")
                key, value = self.result_queue.get()  # Wait until a result is available
                LOGGER.info("In while true ...")
                yield key, value  # Yield the result to the calling function
        except KeyboardInterrupt:
            LOGGER.warning("Listener stopped manually.")
        finally:
            self.StopListener()

    def response_listener(self, filter_key=None):
        """
        Poll Kafka messages and put key-value pairs into the queue.
        """
        LOGGER.info(f"Polling Kafka topic {self.listener_topic}...")

        consumer = self.kafka_consumer
        consumer.subscribe([self.listener_topic])
        msg = consumer.poll(2.0)
        if msg is None:
            return
        elif msg.error():
            if msg.error().code() != KafkaError._PARTITION_EOF:
                LOGGER.error(f"Kafka error: {msg.error()}")
            return

        try:
            key = msg.key().decode('utf-8') if msg.key() else None
            if filter_key is not None and key == filter_key:
                value = json.loads(msg.value().decode('utf-8'))
                LOGGER.info(f"Received key: {key}, value: {value}")
                self.result_queue.put((key, value))
            else:
                LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}")
        except Exception as e:
            LOGGER.error(f"Error processing Kafka message: {e}")

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def StopAnalyzer(self, 
                      request : AnalyzerId, grpc_context: grpc.ServicerContext # type: ignore
@@ -181,15 +107,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
        )
        LOGGER.info("Analyzer Stop Request Generated: Analyzer Id: {:}".format(analyzer_uuid))
        self.kafka_producer.flush()
        self.StopListener()

    def StopListener(self):
        """
        Gracefully stop the Kafka listener and the scheduler.
        """
        LOGGER.info("Stopping Kafka listener...")
        self.scheduler.shutdown()
        LOGGER.info("Kafka listener stopped.")

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SelectAnalyzers(self,