Skip to content
Snippets Groups Projects
AnalyticsFrontendServiceServicerImpl.py 9.4 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    
    Waleed Akbar's avatar
    Waleed Akbar committed
    import logging, grpc, json, queue
    
    from typing          import Dict
    from confluent_kafka import Consumer as KafkaConsumer
    from confluent_kafka import Producer as KafkaProducer
    from confluent_kafka import KafkaError
    
    from common.tools.kafka.Variables             import KafkaConfig, KafkaTopic
    
    from common.proto.context_pb2                 import Empty
    from common.method_wrappers.Decorator         import MetricsPool, safe_and_metered_rpc_method
    from common.proto.analytics_frontend_pb2      import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList
    from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer
    
    from analytics.database.Analyzer_DB           import AnalyzerDB
    from analytics.database.AnalyzerModel         import Analyzer as AnalyzerModel
    
    Waleed Akbar's avatar
    Waleed Akbar committed
    from apscheduler.schedulers.background        import BackgroundScheduler
    from apscheduler.triggers.interval            import IntervalTrigger
    
    LOGGER           = logging.getLogger(__name__)
    METRICS_POOL     = MetricsPool('AnalyticsFrontend', 'NBIgRPC')
    
    
    class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
        def __init__(self):
            LOGGER.info('Init AnalyticsFrontendService')
    
    Waleed Akbar's avatar
    Waleed Akbar committed
            self.listener_topic = KafkaTopic.ANALYTICS_RESPONSE.value
    
            self.db_obj         = AnalyzerDB()
    
    Waleed Akbar's avatar
    Waleed Akbar committed
            self.result_queue   = queue.Queue()
            self.scheduler      = BackgroundScheduler()
    
            self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
            self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
                                                'group.id'           : 'analytics-frontend',
                                                'auto.offset.reset'  : 'latest'})
    
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def StartAnalyzer(self, 
                           request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore
                          ) -> AnalyzerId: # type: ignore
            LOGGER.info ("At Service gRPC message: {:}".format(request))
            response = AnalyzerId()
    
    
            self.db_obj.add_row_to_db(
                AnalyzerModel.ConvertAnalyzerToRow(request)
            )
    
            self.PublishStartRequestOnKafka(request)
    
            
            response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid
    
            return response
    
    
        def PublishStartRequestOnKafka(self, analyzer_obj):
            """
            Method to generate analyzer request on Kafka.
            """
            analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
            analyzer_to_generate : Dict = {
    
                "algo_name"       : analyzer_obj.algorithm_name,
                "input_kpis"      : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
                "output_kpis"     : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
                "oper_mode"       : analyzer_obj.operation_mode,
                "thresholds"      : json.loads(analyzer_obj.parameters["thresholds"]),
                "window_size"     : analyzer_obj.parameters["window_size"],
                "window_slider"   : analyzer_obj.parameters["window_slider"],
                # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] 
    
            }
            self.kafka_producer.produce(
    
    Waleed Akbar's avatar
    Waleed Akbar committed
                KafkaTopic.ANALYTICS_REQUEST.value,
    
                key      = analyzer_uuid,
                value    = json.dumps(analyzer_to_generate),
                callback = self.delivery_callback
            )
            LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate))
            self.kafka_producer.flush()
    
    Waleed Akbar's avatar
    Waleed Akbar committed
            
            # self.StartResponseListener(analyzer_uuid)
    
    Waleed Akbar's avatar
    Waleed Akbar committed
        def StartResponseListener(self, filter_key=None):
            """
            Start the Kafka response listener with APScheduler and return key-value pairs periodically.
            """
            LOGGER.info("Starting StartResponseListener")
            # Schedule the ResponseListener at fixed intervals
            self.scheduler.add_job(
                self.response_listener,
                trigger=IntervalTrigger(seconds=5),
                args=[filter_key], 
                id=f"response_listener_{self.listener_topic}",
                replace_existing=True
            )
            self.scheduler.start()
            LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...")
            try:
                while True:
                    LOGGER.info("entering while...")
                    key, value = self.result_queue.get()  # Wait until a result is available
                    LOGGER.info("In while true ...")
                    yield key, value  # Yield the result to the calling function
            except KeyboardInterrupt:
                LOGGER.warning("Listener stopped manually.")
            finally:
                self.StopListener()
    
        def response_listener(self, filter_key=None):
            """
            Poll Kafka messages and put key-value pairs into the queue.
            """
            LOGGER.info(f"Polling Kafka topic {self.listener_topic}...")
    
            consumer = self.kafka_consumer
            consumer.subscribe([self.listener_topic])
            msg = consumer.poll(2.0)
            if msg is None:
                return
            elif msg.error():
                if msg.error().code() != KafkaError._PARTITION_EOF:
                    LOGGER.error(f"Kafka error: {msg.error()}")
                return
    
            try:
                key = msg.key().decode('utf-8') if msg.key() else None
                if filter_key is not None and key == filter_key:
                    value = json.loads(msg.value().decode('utf-8'))
                    LOGGER.info(f"Received key: {key}, value: {value}")
                    self.result_queue.put((key, value))
                else:
                    LOGGER.info(f"Skipping message with unmatched key: {key}")
                    # value = json.loads(msg.value().decode('utf-8')) # Added for debugging
                    # self.result_queue.put((filter_key, value))             # Added for debugging
            except Exception as e:
                LOGGER.error(f"Error processing Kafka message: {e}")
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def StopAnalyzer(self, 
                          request : AnalyzerId, grpc_context: grpc.ServicerContext # type: ignore
                         ) -> Empty:  # type: ignore
            LOGGER.info ("At Service gRPC message: {:}".format(request))
    
            try:
                analyzer_id_to_delete = request.analyzer_id.uuid
                self.db_obj.delete_db_row_by_id(
                    AnalyzerModel, "analyzer_id", analyzer_id_to_delete
                )
    
    Waleed Akbar's avatar
    Waleed Akbar committed
                self.PublishStopRequestOnKafka(analyzer_id_to_delete)
    
            except Exception as e:
    
    Waleed Akbar's avatar
    Waleed Akbar committed
                LOGGER.error('Unable to delete analyzer. Error: {:}'.format(e))
    
            return Empty()
    
    Waleed Akbar's avatar
    Waleed Akbar committed
        def PublishStopRequestOnKafka(self, analyzer_uuid):
    
            """
            Method to generate stop analyzer request on Kafka.
            """
    
    Waleed Akbar's avatar
    Waleed Akbar committed
            # analyzer_uuid = analyzer_id.analyzer_id.uuid
    
            analyzer_to_stop :  Dict = {
    
    Waleed Akbar's avatar
    Waleed Akbar committed
                "algo_name"   : None,
    
                "input_kpis"  : [],
                "output_kpis" : [],
    
    Waleed Akbar's avatar
    Waleed Akbar committed
                "oper_mode"   : None
    
            }
            self.kafka_producer.produce(
    
    Waleed Akbar's avatar
    Waleed Akbar committed
                KafkaTopic.ANALYTICS_REQUEST.value,
    
                key      = analyzer_uuid,
                value    = json.dumps(analyzer_to_stop),
                callback = self.delivery_callback
            )
            LOGGER.info("Analyzer Stop Request Generated: Analyzer Id: {:}".format(analyzer_uuid))
            self.kafka_producer.flush()
    
    Waleed Akbar's avatar
    Waleed Akbar committed
            self.StopListener()
    
    Waleed Akbar's avatar
    Waleed Akbar committed
        def StopListener(self):
            """
            Gracefully stop the Kafka listener and the scheduler.
            """
            LOGGER.info("Stopping Kafka listener...")
            self.scheduler.shutdown()
            LOGGER.info("Kafka listener stopped.")
    
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def SelectAnalyzers(self, 
    
    Waleed Akbar's avatar
    Waleed Akbar committed
                             filter : AnalyzerFilter, contextgrpc_context: grpc.ServicerContext # type: ignore
    
                            ) -> AnalyzerList:  # type: ignore
    
    Waleed Akbar's avatar
    Waleed Akbar committed
            LOGGER.info("At Service gRPC message: {:}".format(filter))
    
            response = AnalyzerList()
    
    Waleed Akbar's avatar
    Waleed Akbar committed
            try:
                rows = self.db_obj.select_with_filter(AnalyzerModel, filter)
                try:
                    for row in rows:
                        response.analyzer_list.append(
                            AnalyzerModel.ConvertRowToAnalyzer(row)
                        )
                    return response
                except Exception as e:
                    LOGGER.info('Unable to process filter response {:}'.format(e))
            except Exception as e:
                LOGGER.error('Unable to apply filter on table {:}. ERROR: {:}'.format(AnalyzerModel.__name__, e))
           
    
    
        def delivery_callback(self, err, msg):
            if err:
                LOGGER.debug('Message delivery failed: {:}'.format(err))
    
    Waleed Akbar's avatar
    Waleed Akbar committed
                print       ('Message delivery failed: {:}'.format(err))
    
            # else:
            #     LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
            #     print('Message delivered to topic {:}'.format(msg.topic()))