Commit e7fb84b8 authored by Georgios P. Katsikas's avatar Georgios P. Katsikas
Browse files

Merge branch 'pr-analytics' into 'develop'

fix: analytics handlers invoked correctly

See merge request !429
parents 9134d0c1 d7608701
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -39,7 +39,7 @@ enum AnalyzerOperationMode {
message Analyzer {
  AnalyzerId                 analyzer_id          = 1;
  string                     algorithm_name       = 2;  // The algorithm to be executed
  float                      duration_s           = 3;  // Termiate the data analytics thread after duration (seconds); 0 = infinity time
  float                      duration_s           = 3;  // Terminate the data analytics thread after duration (seconds); 0 = infinity time
  repeated kpi_manager.KpiId input_kpi_ids        = 4;  // The KPI Ids to be processed by the analyzer
  repeated kpi_manager.KpiId output_kpi_ids       = 5;  // The KPI Ids produced by the analyzer
  AnalyzerOperationMode      operation_mode       = 6;  // Operation mode of the analyzer
+21 −22
Original line number Diff line number Diff line
@@ -26,7 +26,6 @@ from common.Settings import get_service_port_grpc
from analytics.backend.service.Streamer import DaskStreamer
from analytics.backend.service.AnalyzerHelper import AnalyzerHelper


LOGGER = logging.getLogger(__name__)

class AnalyticsBackendService(GenericGrpcService):
@@ -80,18 +79,18 @@ class AnalyticsBackendService(GenericGrpcService):
            try:
                analyzer = json.loads(message.value().decode('utf-8'))
                analyzer_uuid = message.key().decode('utf-8')
                LOGGER.info('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
                LOGGER.info('Received Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))

                if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
                    if self.StopStreamer(analyzer_uuid):
                        LOGGER.info("Dask Streamer stopped.")
                        LOGGER.info("Dask Streamer stopped")
                    else:
                        LOGGER.warning("Failed to stop Dask Streamer. May be already terminated...")
                else:
                    if self.StartStreamer(analyzer_uuid, analyzer):
                        LOGGER.info("Dask Streamer started.")
                        LOGGER.info("Dask Streamer started")
                    else:
                        LOGGER.warning("Failed to start Dask Streamer.")
                        LOGGER.warning("Failed to start Dask Streamer")
            except Exception as e:
                LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))

@@ -103,6 +102,7 @@ class AnalyticsBackendService(GenericGrpcService):
        if analyzer_uuid in self.active_streamers:
            LOGGER.warning("Dask Streamer already running with the given analyzer_uuid: {:}".format(analyzer_uuid))
            return False
        LOGGER.info(f"Start Streamer for Analyzer:\n{analyzer}")
        try:
            streamer = DaskStreamer(
                key               = analyzer_uuid,
@@ -116,7 +116,7 @@ class AnalyticsBackendService(GenericGrpcService):
                producer_instance = self.central_producer,
            )
            streamer.start()
            LOGGER.info(f"Streamer started with analyzer Id: {analyzer_uuid}")
            LOGGER.info(f"Streamer started with analyzer ID: {analyzer_uuid}")

            # Stop the streamer after the given duration
            duration = analyzer['duration']
@@ -125,11 +125,10 @@ class AnalyticsBackendService(GenericGrpcService):
                    time.sleep(duration)
                    LOGGER.warning(f"Execution duration ({duration}) completed of Analyzer: {analyzer_uuid}")
                    if not self.StopStreamer(analyzer_uuid):
                        LOGGER.warning("Failed to stop Dask Streamer. Streamer may be already terminated.")
                        LOGGER.warning("Failed to stop Dask Streamer. Streamer may already be terminated")

                duration_thread = threading.Thread(
                    target=stop_after_duration, daemon=True, name=f"stop_after_duration_{analyzer_uuid}"
                    )
                    target=stop_after_duration, daemon=True, name=f"stop_after_duration_{analyzer_uuid}")
                duration_thread.start()

            self.active_streamers[analyzer_uuid] = streamer
@@ -151,10 +150,10 @@ class AnalyticsBackendService(GenericGrpcService):
            streamer.stop()
            streamer.join()
            del self.active_streamers[analyzer_uuid]
            LOGGER.info(f"Streamer with analyzer_uuid '{analyzer_uuid}' has been trerminated sucessfully.")
            LOGGER.info(f"Streamer with analyzer_uuid '{analyzer_uuid}' has been successfully terminated")
            return True
        except:
            LOGGER.exception("Failed to stop Dask Streamer.")
            LOGGER.exception("Failed to stop Dask Streamer")
            return False

    def close(self):
@@ -164,13 +163,13 @@ class AnalyticsBackendService(GenericGrpcService):
        if self.central_producer:
            try:
                self.central_producer.flush()
                LOGGER.info("Kafka producer flushed and closed.")
                LOGGER.info("Kafka producer flushed and closed")
            except:
                LOGGER.exception("Error closing Kafka producer")
        if self.cluster:
            try:
                self.cluster.close()
                LOGGER.info("Dask cluster closed.")
                LOGGER.info("Dask cluster closed")
            except:
                LOGGER.exception("Error closing Dask cluster")

+118 −89
Original line number Diff line number Diff line
@@ -13,16 +13,15 @@
# limitations under the License.

import logging
from enum import Enum
import pandas as pd
from enum import Enum
from collections import defaultdict

logger = logging.getLogger(__name__)


class Handlers(Enum):
    AGGREGATION_HANDLER = "AggregationHandler"
    AGGREGATION_HANDLER_THREE_TO_ONE = "AggregationHandlerThreeToOne"
    AGGREGATION_HANDLER_MANY_TO_ONE = "AggregationHandlerManyToOne"
    UNSUPPORTED_HANDLER = "UnsupportedHandler"

    @classmethod
@@ -30,23 +29,30 @@ class Handlers(Enum):
        return handler_name in cls._value2member_map_

def select_handler(handler_name):
    if handler_name == "AggregationHandler":
        return aggregation_handler
    elif handler_name == "AggregationHandlerThreeToOne":
        return aggregation_handler_three_to_one
    else:
        return "UnsupportedHandler"
    try:
        logger.info(f"Aggregation handler: {handler_name}")
        handler_enum = Handlers(handler_name)  # auto-validates
        return HANDLER_FUNCTIONS[handler_enum]
    except (ValueError, KeyError):
        logger.error("Unsupported handler")
        raise ValueError(f"Unsupported handler: {handler_name}")

def transform_data(record : pd.DataFrame, value_key : str) -> pd.DataFrame:
    new_value_key = 'value'
    return record.rename(columns={value_key: new_value_key})

def find(data, type, value):
    return next((item for item in data if item[type] == value), None)

# This method is top-level and should not be part of the class due to serialization issues.
def threshold_handler(key, aggregated_df, thresholds):
    """
    Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary
    Apply thresholds (value_threshold_low and value_threshold_high) based on the thresholds dictionary
    on the aggregated DataFrame.

    Args:
        key (str): Key for the aggregated DataFrame.
        aggregated_df (pd.DataFrame): DataFrame with aggregated metrics.
        thresholds (dict): Thresholds dictionary with keys in the format '<metricName>' and values as (fail_th, raise_th).
        thresholds (dict): Thresholds dictionary with keys in the format '<metricName>' and values as (value_threshold_low, value_threshold_high).

    Returns:
        pd.DataFrame: DataFrame with additional threshold columns.
@@ -57,30 +63,33 @@ def threshold_handler(key, aggregated_df, thresholds):
            logger.warning(f"Metric '{metric_name}' does not exist in the DataFrame for key: {key}. Skipping threshold application.")
            continue

        logger.info(f"[Threshold] Metric: {metric_name}")
        logger.info(f"[Threshold] Value range: {threshold_values}")

        # Ensure the threshold values are valid (check for tuple specifically)
        if isinstance(threshold_values, list) and len(threshold_values) == 2:
            fail_th, raise_th = threshold_values
            fall_th, raise_th = threshold_values
            
            # Add threshold columns with updated naming
            aggregated_df[f"{metric_name}_TH_RAISE"] = aggregated_df[metric_name] > raise_th
            aggregated_df[f"{metric_name}_TH_FALL"]  = aggregated_df[metric_name] < fail_th
            aggregated_df[f"value_threshold_low"]  = aggregated_df[metric_name] < fall_th
            aggregated_df[f"value_threshold_high"] = aggregated_df[metric_name] > raise_th
        else:
            logger.warning(f"Threshold values for '{metric_name}' ({threshold_values}) are not a list of length 2. Skipping threshold application.")
    logger.info(f"[AggregatedDF]: {aggregated_df}")
    return aggregated_df

def aggregation_handler(
        batch_type_name, key, batch, input_kpi_list, output_kpi_list, thresholds
    ):
def aggregation_handler(key, batch, input_kpi_list, output_kpi_list, thresholds):
    """
      Process a batch of data and calculate aggregated values for each input KPI
      and maps them to the output KPIs. """
      and maps them to the output KPIs.
    """

    logger.info(f"({batch_type_name}) Processing batch for key: {key}")
    logger.info("AggregationHandler starts")
    if not batch:
        logger.info("Empty batch received. Skipping processing.")
        logger.warning("Empty batch received. Skipping processing")
        return []
    else:
        logger.info(f" >>>>> Processing {len(batch)} records for key: {key}")

    logger.info(f"Processing {len(batch)} records for key: {key}")

    # Convert data into a DataFrame
    df = pd.DataFrame(batch)
@@ -89,7 +98,7 @@ def aggregation_handler(
    df = df[df['kpi_id'].isin(input_kpi_list)].copy()

    if df.empty:
            logger.warning(f"No data available for KPIs: {input_kpi_list}. Skipping processing.")
        logger.warning(f"No data available for KPIs: {input_kpi_list}. Skipping processing")
        return []

    # Define all possible aggregation methods
@@ -109,9 +118,9 @@ def aggregation_handler(

    # Process each KPI-specific task parameter
    for kpi_index, kpi_id in enumerate(input_kpi_list):

            # logger.info(f"1.Processing KPI: {kpi_id}")
        logger.debug(f"Processing KPI: {kpi_id}")
        kpi_task_parameters = thresholds["task_parameter"][kpi_index]
        logger.debug(f"KPI task parameters: {kpi_task_parameters}")

        # Get valid task parameters for this KPI
        valid_task_parameters = [
@@ -121,40 +130,55 @@ def aggregation_handler(

        # Select the aggregation methods based on valid task parameters
        selected_methods = {method: aggregation_methods[method] for method in valid_task_parameters}
        logger.debug(f"Processing methods: {selected_methods}")

            # logger.info(f"2. Processing KPI: {kpi_id} with task parameters: {kpi_task_parameters}")
        kpi_df = df[df['kpi_id'] == kpi_id]
        logger.debug(f"KPI data frame:\n{kpi_df}")

        # Check if kpi_df is not empty before applying the aggregation methods
        if not kpi_df.empty:
            agg_df = kpi_df.groupby('kpi_id').agg(**selected_methods).reset_index()

                # logger.info(f"3. Aggregated DataFrame for KPI: {kpi_id}: {agg_df}")
            logger.debug(f"Aggregated DataFrame for KPI {kpi_id}:\n{agg_df}")

            agg_df['kpi_id'] = output_kpi_list[kpi_index]

                # logger.info(f"4. Applying thresholds for df: {agg_df['kpi_id']}")
            record = threshold_handler(key, agg_df, kpi_task_parameters)

                results.extend(record.to_dict(orient='records'))
            # Make the data frame agnostic to the aggregation method
            value_key = list(selected_methods.keys())[0]
            upd_record = transform_data(record, value_key)

            # Store the record
            results.extend(upd_record.to_dict(orient='records'))
        else:
                logger.warning(f"No data available for KPIs: {kpi_id}. Skipping aggregation.")
            logger.warning(f"No data available for KPIs: {kpi_id}. Skipping aggregation")
            continue

    if results:
        logger.info(f"Aggregation result: {results}")
        return results
    else:
        return []

def find(data , type , value):
    return next((item for item in data if item[type] == value), None)
def aggregation_handler_many_to_one(key, batch, input_kpi_list, output_kpi_list, thresholds):
    logger.info("AggregationHandlerManyToOne starts")
    if not batch:
        logger.warning("Empty batch received. Skipping processing.")
        return []

    logger.info(f"Processing {len(batch)} records for key: {key}")

def aggregation_handler_three_to_one(
        batch_type_name, key, batch, input_kpi_list, output_kpi_list, thresholds
):
    kpi_task_parameters = None
    for kpi_index, kpi_id in enumerate(input_kpi_list):
        logger.debug(f"Processing KPI: {kpi_id}")
        kpi_task_parameters = thresholds["task_parameter"][kpi_index]
        logger.debug(f"KPI task parameters: {kpi_task_parameters}")

    threshold_high, threshold_low = None, None
    for _, threshold_values in kpi_task_parameters.items():
        if isinstance(threshold_values, list) and len(threshold_values) == 2:
            threshold_low, threshold_high = threshold_values

    # Group and sum
    # Track sum and count
    sum_dict = defaultdict(int)
    count_dict = defaultdict(int)

@@ -173,13 +197,18 @@ def aggregation_handler_three_to_one(

    result = {
        "kpi_id": output_kpi_list[0],
        "avg": total_kpi_metric,
        "THRESHOLD_RAISE": bool(total_kpi_metric > 2600),
        "THRESHOLD_FALL": bool(total_kpi_metric < 699)
        "value": total_kpi_metric,
        "value_threshold_high": bool(total_kpi_metric > threshold_high),
        "value_threshold_low": bool(total_kpi_metric < threshold_low)
    }
    results = []

    results.append(result)
    logger.warning(f"result : {result}.")
    logger.info(f"Aggregation result: {result}")

    return results

HANDLER_FUNCTIONS = {
    Handlers.AGGREGATION_HANDLER: aggregation_handler,
    Handlers.AGGREGATION_HANDLER_MANY_TO_ONE: aggregation_handler_many_to_one
}
+41 −12
Original line number Diff line number Diff line
@@ -19,13 +19,11 @@ import logging

from confluent_kafka                            import KafkaException, KafkaError
from common.tools.kafka.Variables               import KafkaTopic
from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler, aggregation_handler_three_to_one , select_handler
from analytics.backend.service.AnalyzerHandlers import Handlers, select_handler, aggregation_handler
from analytics.backend.service.AnalyzerHelper   import AnalyzerHelper


logger = logging.getLogger(__name__)


class DaskStreamer(threading.Thread):
    def __init__(self, key, input_kpis, output_kpis, thresholds,
                 batch_size        = 5,
@@ -45,6 +43,14 @@ class DaskStreamer(threading.Thread):
        self.running        = True
        self.batch          = []

        logger.info(f"Dask Streamer            key: {self.key}")
        logger.info(f"Dask Streamer    input  KPIs: {self.input_kpis}")
        logger.info(f"Dask Streamer    output KPIs: {self.output_kpis}")
        logger.info(f"Dask Streamer     thresholds: {self.thresholds}")
        logger.info(f"Dask Streamer    window size: {self.window_size}")
        logger.info(f"Dask Streamer     batch size: {self.batch_size}")
        logger.info(f"Dask Streamer batch duration: {self.batch_duration}")

        # Initialize Kafka and Dask components
        self.client   = AnalyzerHelper.initialize_dask_client(cluster_instance)
        self.consumer = AnalyzerHelper.initialize_kafka_consumer()      # Single-threaded consumer
@@ -55,7 +61,7 @@ class DaskStreamer(threading.Thread):
    def run(self):
        """Main method to start the DaskStreamer."""
        try:
            logger.info("Starting Dask Streamer")
            logger.info("Dask Streamer started")
            last_batch_time = time.time()
            while True:
                if not self.consumer:
@@ -69,7 +75,7 @@ class DaskStreamer(threading.Thread):
                    break
                message = self.consumer.poll(timeout=1.0)
                if message is None:
                    # logger.info("No new messages received.")
                    logger.debug("No new messages received.")
                    continue
                if message.error():
                    if message.error().code() == KafkaError._PARTITION_EOF:
@@ -85,7 +91,12 @@ class DaskStreamer(threading.Thread):
                    except json.JSONDecodeError:
                        logger.error(f"Failed to decode message: {message.value()}")
                        continue
                    # This streamer is only meant to serve a list of input KPIs
                    if value["kpi_id"] in self.input_kpis:
                        self.batch.append(value)
                    # Ignore the rest..
                    else:
                        continue

                # Window size has a precedence over batch size
                if self.batch_duration is None:
@@ -106,18 +117,32 @@ class DaskStreamer(threading.Thread):
            logger.exception(f"Error in Dask streaming process: {e}")
        finally:
            self.stop()
            logger.info(">>> Exiting Dask Streamer...")
            logger.info("Exiting Dask Streamer...")

    def task_handler_selector(self):
        """Select the task handler based on the task type."""
        logger.info(f"Batch to be processed: {self.batch}")
        if Handlers.is_valid_handler(self.thresholds["task_type"]):
        handler_name = self.thresholds["task_type"]
        if Handlers.is_valid_handler(handler_name):
            if self.client is not None and self.client.status == 'running':
                logger.info(f"Selecting the handler for key {self.key}")
                logger.info(f"|--> Input  KPIs {self.input_kpis}")
                logger.info(f"|--> Output KPIs {self.output_kpis}")
                logger.info(f"|--> Thresholds  {self.thresholds}")
                try:
                    future = self.client.submit(select_handler(self.thresholds["task_type"]), "batch size",
                    handler_fn = select_handler(handler_name)
                    future = self.client.submit(
                        handler_fn,
                        self.key,
                                                self.batch, self.input_kpis, self.output_kpis, self.thresholds)
                    future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value))
                        self.batch,
                        self.input_kpis,
                        self.output_kpis,
                        self.thresholds
                    )
                    logger.info(f"|--> Handler result  {future.result()}")
                    future.add_done_callback(
                        lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value)
                    )
                except Exception as e:
                    logger.error(
                        f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}")
@@ -132,6 +157,10 @@ class DaskStreamer(threading.Thread):
            logger.warning("Nothing to produce. Skipping.")
            return
        for record in result:
            # Filter out records not related with the output KPI of interest
            if record["kpi_id"] not in self.output_kpis:
                continue
            logger.info(f"Kafka Alarm - Record: {record}")
            try:
                self.producer.produce(
                    destination_topic,
+6 −5
Original line number Diff line number Diff line
@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import json
import pytest
import logging
@@ -22,7 +21,6 @@ from unittest.mock import MagicMock, patch
from .messages_analyzer import get_batch, get_input_kpi_list, get_output_kpi_list, get_thresholds, \
                               get_windows_size, get_batch_size, get_agg_df, get_duration

from common.tools.kafka.Variables                      import KafkaTopic
from analytics.backend.service.Streamer                import DaskStreamer
from analytics.backend.service.AnalyzerHandlers        import aggregation_handler, threshold_handler
from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
@@ -194,6 +192,7 @@ def test_produce_result(dask_streamer):
    result = [{"kpi_id": "kpi1", "value": 100}]
    with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.delivery_report', return_value=None) as mock_delivery_report, \
            patch.object(dask_streamer.producer, 'produce') as mock_produce:
        dask_streamer.output_kpis = ['kpi1']
        dask_streamer.produce_result(result, "test_topic")
        mock_produce.assert_called_once_with(
            "test_topic",
@@ -221,6 +220,8 @@ def test_run_with_valid_consumer(dask_streamer):
    with patch.object(dask_streamer.consumer, 'poll')         as mock_poll, \
         patch.object(dask_streamer, 'task_handler_selector') as mock_task_handler_selector:
        
        dask_streamer.input_kpis = ['kpi1', 'kpi2']

        # Simulate valid messages without errors
        mock_message_1 = MagicMock()
        mock_message_1.value.return_value = b'{"kpi_id": "kpi1", "value": 100}'
@@ -261,7 +262,7 @@ def test_aggregation_handler():

    # Test aggregation_handler
    aggregated_df = aggregation_handler(
        "test_batch", "test_key", batch, input_kpi_list, output_kpi_list, thresholds
        "test_key", batch, input_kpi_list, output_kpi_list, thresholds
    )
    assert isinstance(aggregated_df, list)
    assert all(isinstance(item, dict) for item in aggregated_df)
@@ -276,7 +277,7 @@ def test_threshold_handler():
    result = threshold_handler("test_key", agg_df, thresholds["task_parameter"][0])

    assert isinstance(result, pd.DataFrame)
    assert result.shape == (1, 7)
    assert result.shape == (1, 5)


###########################
Loading