Commit a833bdbd authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'feat/247-cttc-analytics-module-enhancements' into 'develop'

Resolve "(CTTC) Analytics Module Enhancements"

See merge request !318
parents bd059d3d bb6e6b8a
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -18,8 +18,10 @@ PROJECTDIR=`pwd`

cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc

export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require"

python3 -m pytest --log-level=DEBUG --log-cli-level=INFO --verbose \
    analytics/frontend/tests/test_frontend.py
+42 −32
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ import threading
from common.tools.service.GenericGrpcService import GenericGrpcService
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer
from confluent_kafka import KafkaError
from confluent_kafka import KafkaError, KafkaException
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from analytics.backend.service.Streamer import DaskStreamer
@@ -32,7 +32,7 @@ LOGGER = logging.getLogger(__name__)
class AnalyticsBackendService(GenericGrpcService):
    """
    AnalyticsBackendService class is responsible for handling the requests from the AnalyticsFrontendService.
    It listens to the Kafka topic for the requests and starts/stops the DaskStreamer accordingly.
    It listens to the Kafka topic for the requests to start and stop the Streamer accordingly.
    It also initializes the Kafka producer and Dask cluster for the streamer.
    """
    def __init__(self, cls_name : str = __name__, n_workers=1, threads_per_worker=1
@@ -62,34 +62,36 @@ class AnalyticsBackendService(GenericGrpcService):
        listener for requests on Kafka topic.
        """
        LOGGER.info("Request Listener is initiated ...")
        # print      ("Request Listener is initiated ...")
        consumer = self.request_consumer
        consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
        while True:
            receive_msg = consumer.poll(2.0)
            if receive_msg is None:
            message = consumer.poll(2.0)
            if message is None:
                continue
            elif receive_msg.error():
                if receive_msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    LOGGER.error("Consumer error: {:}".format(receive_msg.error()))
            elif message.error():
                    if message.error().code() == KafkaError._PARTITION_EOF:
                        LOGGER.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}")
                        break
                    elif message.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
                        LOGGER.error(f"Subscribed topic {message.topic()} does not exist. May be topic does not have any messages.")
                        continue
                    elif message.error():
                        raise KafkaException(message.error())
            try:
                analyzer      = json.loads(receive_msg.value().decode('utf-8'))
                analyzer_uuid = receive_msg.key().decode('utf-8')
                analyzer      = json.loads(message.value().decode('utf-8'))
                analyzer_uuid = message.key().decode('utf-8')
                LOGGER.info('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))

                if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
                    if self.StopStreamer(analyzer_uuid):
                        LOGGER.info("Dask Streamer stopped.")
                    else:
                        LOGGER.error("Failed to stop Dask Streamer.")
                        LOGGER.warning("Failed to stop Dask Streamer. May be already terminated...")
                else:
                    if self.StartStreamer(analyzer_uuid, analyzer):
                        LOGGER.info("Dask Streamer started.")
                    else:
                        LOGGER.error("Failed to start Dask Streamer.")
                        LOGGER.warning("Failed to start Dask Streamer.")
            except Exception as e:
                LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))

@@ -107,7 +109,8 @@ class AnalyticsBackendService(GenericGrpcService):
                input_kpis        = analyzer['input_kpis'        ],
                output_kpis       = analyzer['output_kpis'       ],
                thresholds        = analyzer['thresholds'        ],
                batch_size        = analyzer['batch_size' ],
                batch_size        = analyzer['batch_size_min'    ],
                batch_duration    = analyzer['batch_duration_min'],
                window_size       = analyzer['window_size'       ],
                cluster_instance  = self.cluster,
                producer_instance = self.central_producer,
@@ -116,14 +119,17 @@ class AnalyticsBackendService(GenericGrpcService):
            LOGGER.info(f"Streamer started with analyzer Id: {analyzer_uuid}")

            # Stop the streamer after the given duration
            if analyzer['duration'] > 0:
            duration = analyzer['duration']
            if duration > 0:
                def stop_after_duration():
                    time.sleep(analyzer['duration'])
                    LOGGER.warning(f"Execution duration completed of Analyzer: {analyzer_uuid}")
                    time.sleep(duration)
                    LOGGER.warning(f"Execution duration ({duration}) completed of Analyzer: {analyzer_uuid}")
                    if not self.StopStreamer(analyzer_uuid):
                        LOGGER.warning("Failed to stop Dask Streamer. Streamer may be already terminated.")

                duration_thread = threading.Thread(target=stop_after_duration, daemon=True)
                duration_thread = threading.Thread(
                    target=stop_after_duration, daemon=True, name=f"stop_after_duration_{analyzer_uuid}"
                    )
                duration_thread.start()

            self.active_streamers[analyzer_uuid] = streamer
@@ -139,7 +145,7 @@ class AnalyticsBackendService(GenericGrpcService):
        try:
            if analyzer_uuid not in self.active_streamers:
                LOGGER.warning("Dask Streamer not found with the given analyzer_uuid: {:}".format(analyzer_uuid))
                return False
                return True
            LOGGER.info(f"Terminating streamer with Analyzer Id: {analyzer_uuid}")
            streamer = self.active_streamers[analyzer_uuid]
            streamer.stop()
@@ -147,11 +153,11 @@ class AnalyticsBackendService(GenericGrpcService):
            del self.active_streamers[analyzer_uuid]
            LOGGER.info(f"Streamer with analyzer_uuid '{analyzer_uuid}' has been trerminated sucessfully.")
            return True
        except Exception as e:
            LOGGER.error("Failed to stop Dask Streamer. ERROR: {:}".format(e))
        except:
            LOGGER.exception("Failed to stop Dask Streamer.")
            return False

    def close(self):        # TODO: Is this function needed?
    def close(self):
        """
        Close the producer and cluster cleanly.
        """
@@ -159,11 +165,15 @@ class AnalyticsBackendService(GenericGrpcService):
            try:
                self.central_producer.flush()
                LOGGER.info("Kafka producer flushed and closed.")
            except Exception as e:
                LOGGER.error(f"Error closing Kafka producer: {e}")
            except:
                LOGGER.exception("Error closing Kafka producer")
        if self.cluster:
            try:
                self.cluster.close()
                LOGGER.info("Dask cluster closed.")
            except Exception as e:
                LOGGER.error(f"Error closing Dask cluster: {e}")
            except:
                LOGGER.exception("Error closing Dask cluster")

    def stop(self):
        self.close()
        return super().stop()
+8 −3
Original line number Diff line number Diff line
@@ -95,6 +95,8 @@ def aggregation_handler(
            "sum"     : ('kpi_value', 'sum'),
        }

        results = []
        
        # Process each KPI-specific task parameter
        for kpi_index, kpi_id in enumerate(input_kpi_list):

@@ -122,10 +124,13 @@ def aggregation_handler(
                agg_df['kpi_id'] = output_kpi_list[kpi_index]

                # logger.info(f"4. Applying thresholds for df: {agg_df['kpi_id']}")
                result = threshold_handler(key, agg_df, kpi_task_parameters)
                record = threshold_handler(key, agg_df, kpi_task_parameters)

                return result.to_dict(orient='records')
                results.extend(record.to_dict(orient='records'))
            else:
                logger.warning(f"No data available for KPIs: {kpi_id}. Skipping aggregation.")
                continue
        if results:
            return results
        else:
            return []
+17 −12
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@ logger = logging.getLogger(__name__)
class DaskStreamer(threading.Thread):
    def __init__(self, key, input_kpis, output_kpis, thresholds, 
                 batch_size        = 5, 
                 batch_duration    = None,
                 window_size       = None,
                 cluster_instance  = None,
                 producer_instance = AnalyzerHelper.initialize_kafka_producer()
@@ -38,8 +39,9 @@ class DaskStreamer(threading.Thread):
        self.input_kpis     = input_kpis
        self.output_kpis    = output_kpis
        self.thresholds     = thresholds
        self.window_size = window_size
        self.window_size    = window_size      # TODO: Not implemented
        self.batch_size     = batch_size
        self.batch_duration = batch_duration
        self.running        = True
        self.batch          = []

@@ -65,13 +67,16 @@ class DaskStreamer(threading.Thread):
                if not self.client:
                    logger.warning("Dask client is not running. Exiting loop.")
                    break
                message = self.consumer.poll(timeout=2.0)
                message = self.consumer.poll(timeout=1.0)
                if message is None:
                    # logger.info("No new messages received.")
                    continue
                if message.error():
                    if message.error().code() == KafkaError._PARTITION_EOF:
                        logger.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}")
                    elif message.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
                        logger.error(f"Subscribed topic {message.topic()} does not exist. May be topic does not have any messages.")
                        continue
                    elif message.error():
                        raise KafkaException(message.error())
                else:
@@ -83,7 +88,7 @@ class DaskStreamer(threading.Thread):
                    self.batch.append(value)

                # Window size has a precedence over batch size
                if self.window_size is None:
                if self.batch_duration is None:
                    if len(self.batch) >= self.batch_size:  # If batch size is not provided, process continue with the default batch size
                        logger.info(f"Processing based on batch size {self.batch_size}.")
                        self.task_handler_selector()
@@ -91,8 +96,8 @@ class DaskStreamer(threading.Thread):
                else:
                    # Process based on window size
                    current_time = time.time()
                    if (current_time - last_batch_time) >= self.window_size and self.batch:
                        logger.info(f"Processing based on window size {self.window_size}.")
                    if (current_time - last_batch_time) >= self.batch_duration and self.batch:
                        logger.info(f"Processing based on window size {self.batch_duration}.")
                        self.task_handler_selector()
                        self.batch = []
                        last_batch_time = current_time
+5 −2
Original line number Diff line number Diff line
@@ -32,13 +32,16 @@ def get_thresholds():
    }

def get_duration():
    return 40
    return 90

def get_batch_duration():
    return 30

def get_windows_size():
    return None

def get_batch_size():
    return 10
    return 5

def get_interval():
    return 5
Loading