Newer
Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing          import Dict
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import KafkaError
from common.tools.kafka.Variables             import KafkaConfig, KafkaTopic
from common.proto.context_pb2                 import Empty
from common.method_wrappers.Decorator         import MetricsPool, safe_and_metered_rpc_method
from common.proto.analytics_frontend_pb2      import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList
from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer
Waleed Akbar
committed
from analytics.database.Analyzer_DB           import AnalyzerDB
from analytics.database.AnalyzerModel         import Analyzer as AnalyzerModel
from apscheduler.schedulers.background        import BackgroundScheduler
from apscheduler.triggers.interval            import IntervalTrigger
LOGGER           = logging.getLogger(__name__)
METRICS_POOL     = MetricsPool('AnalyticsFrontend', 'NBIgRPC')
class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
    def __init__(self):
        LOGGER.info('Init AnalyticsFrontendService')
        self.listener_topic = KafkaTopic.ANALYTICS_RESPONSE.value
        self.result_queue   = queue.Queue()
        self.scheduler      = BackgroundScheduler()
        self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
        self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
                                            'group.id'           : 'analytics-frontend',
                                            'auto.offset.reset'  : 'latest'})
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def StartAnalyzer(self, 
                       request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore
                      ) -> AnalyzerId: # type: ignore
        LOGGER.info ("At Service gRPC message: {:}".format(request))
        response = AnalyzerId()
        self.db_obj.add_row_to_db(
            AnalyzerModel.ConvertAnalyzerToRow(request)
        )
        
        response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid
    def PublishStartRequestOnKafka(self, analyzer_obj):
        """
        Method to generate analyzer request on Kafka.
        """
        analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
        analyzer_to_generate : Dict = {
            "algo_name"       : analyzer_obj.algorithm_name,
            "input_kpis"      : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
            "output_kpis"     : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
            "oper_mode"       : analyzer_obj.operation_mode,
            "thresholds"      : json.loads(analyzer_obj.parameters["thresholds"]),
            "window_size"     : analyzer_obj.parameters["window_size"],
            "window_slider"   : analyzer_obj.parameters["window_slider"],
            # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] 
            key      = analyzer_uuid,
            value    = json.dumps(analyzer_to_generate),
            callback = self.delivery_callback
        )
        LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate))
        self.kafka_producer.flush()
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
    def StartResponseListener(self, filter_key=None):
        """
        Start the Kafka response listener with APScheduler and return key-value pairs periodically.
        """
        LOGGER.info("Starting StartResponseListener")
        # Schedule the ResponseListener at fixed intervals
        self.scheduler.add_job(
            self.response_listener,
            trigger=IntervalTrigger(seconds=5),
            args=[filter_key], 
            id=f"response_listener_{self.listener_topic}",
            replace_existing=True
        )
        self.scheduler.start()
        LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...")
        try:
            while True:
                LOGGER.info("entering while...")
                key, value = self.result_queue.get()  # Wait until a result is available
                LOGGER.info("In while true ...")
                yield key, value  # Yield the result to the calling function
        except KeyboardInterrupt:
            LOGGER.warning("Listener stopped manually.")
        finally:
            self.StopListener()
    def response_listener(self, filter_key=None):
        """
        Poll Kafka messages and put key-value pairs into the queue.
        """
        LOGGER.info(f"Polling Kafka topic {self.listener_topic}...")
        consumer = self.kafka_consumer
        consumer.subscribe([self.listener_topic])
        msg = consumer.poll(2.0)
        if msg is None:
            return
        elif msg.error():
            if msg.error().code() != KafkaError._PARTITION_EOF:
                LOGGER.error(f"Kafka error: {msg.error()}")
            return
        try:
            key = msg.key().decode('utf-8') if msg.key() else None
            if filter_key is not None and key == filter_key:
                value = json.loads(msg.value().decode('utf-8'))
                LOGGER.info(f"Received key: {key}, value: {value}")
                self.result_queue.put((key, value))
            else:
                LOGGER.info(f"Skipping message with unmatched key: {key}")
                # value = json.loads(msg.value().decode('utf-8')) # Added for debugging
                # self.result_queue.put((filter_key, value))             # Added for debugging
        except Exception as e:
            LOGGER.error(f"Error processing Kafka message: {e}")
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def StopAnalyzer(self, 
                      request : AnalyzerId, grpc_context: grpc.ServicerContext # type: ignore
                     ) -> Empty:  # type: ignore
        LOGGER.info ("At Service gRPC message: {:}".format(request))
        try:
            analyzer_id_to_delete = request.analyzer_id.uuid
            self.db_obj.delete_db_row_by_id(
                AnalyzerModel, "analyzer_id", analyzer_id_to_delete
            )
            self.PublishStopRequestOnKafka(analyzer_id_to_delete)
            LOGGER.error('Unable to delete analyzer. Error: {:}'.format(e))
        """
        Method to generate stop analyzer request on Kafka.
        """
            "input_kpis"  : [],
            "output_kpis" : [],
            key      = analyzer_uuid,
            value    = json.dumps(analyzer_to_stop),
            callback = self.delivery_callback
        )
        LOGGER.info("Analyzer Stop Request Generated: Analyzer Id: {:}".format(analyzer_uuid))
        self.kafka_producer.flush()
    def StopListener(self):
        """
        Gracefully stop the Kafka listener and the scheduler.
        """
        LOGGER.info("Stopping Kafka listener...")
        self.scheduler.shutdown()
        LOGGER.info("Kafka listener stopped.")
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SelectAnalyzers(self, 
                         filter : AnalyzerFilter, contextgrpc_context: grpc.ServicerContext # type: ignore
        LOGGER.info("At Service gRPC message: {:}".format(filter))
        try:
            rows = self.db_obj.select_with_filter(AnalyzerModel, filter)
            try:
                for row in rows:
                    response.analyzer_list.append(
                        AnalyzerModel.ConvertRowToAnalyzer(row)
                    )
                return response
            except Exception as e:
                LOGGER.info('Unable to process filter response {:}'.format(e))
        except Exception as e:
            LOGGER.error('Unable to apply filter on table {:}. ERROR: {:}'.format(AnalyzerModel.__name__, e))
       
    def delivery_callback(self, err, msg):
        if err:
            LOGGER.debug('Message delivery failed: {:}'.format(err))
        else:
            LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
            print('Message delivered to topic {:}'.format(msg.topic()))