Commit fa2fbda4 authored by Konstantinos Poulakakis's avatar Konstantinos Poulakakis
Browse files

Make some changes on the response object that been written to the kafka topic.

parent 18a6c2e6
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -36,7 +36,7 @@ class AnalyticsBackendService(GenericGrpcService):
        port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND)
        super().__init__(port, cls_name=cls_name)
        self.running_threads = {}       # To keep track of all running analyzers 
        self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : '10.152.183.186:9092',
        self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
                                            'group.id'           : 'analytics-frontend',
                                            'auto.offset.reset'  : 'latest'})

+34 −6
Original line number Diff line number Diff line
@@ -43,6 +43,7 @@ def GetAggregationMappings(thresholds):
        agg_dict[threshold_key] = ('kpi_value', aggregation)
    return agg_dict


def ApplyThresholds(aggregated_df, thresholds):
    """
    Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary
@@ -53,12 +54,14 @@ def ApplyThresholds(aggregated_df, thresholds):
    """
    for threshold_key, threshold_values in thresholds.items():
        if threshold_key not in aggregated_df.columns:

            LOGGER.warning(f"Threshold key '{threshold_key}' does not correspond to any aggregation result. Skipping threshold application.")
            continue
        if isinstance(threshold_values, (list, tuple)) and len(threshold_values) == 2:
            fail_th, raise_th = threshold_values
            aggregated_df[f"{threshold_key}_THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th
            aggregated_df[f"{threshold_key}_THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th
            aggregated_df["THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th
            aggregated_df["THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th
            aggregated_df["value"] = aggregated_df[threshold_key]
        else:
            LOGGER.warning(f"Threshold values for '{threshold_key}' are not a list or tuple of length 2. Skipping threshold application.")
    return aggregated_df
@@ -96,7 +99,7 @@ def process_batch(batch, agg_mappings, thresholds, key):

    df = pd.DataFrame(batch)
    LOGGER.info(f"df {df} ")
    df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce',unit='s')
    df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce')
    df.dropna(subset=['time_stamp'], inplace=True)
    LOGGER.info(f"df {df} ")
    required_columns = {'time_stamp', 'kpi_id', 'kpi_value'}
@@ -110,19 +113,44 @@ def process_batch(batch, agg_mappings, thresholds, key):
    # Perform aggregations using named aggregation
    try:
        agg_dict = {key: value for key, value in agg_mappings.items()}
        df_agg = df.groupby(['window_start']).agg(**agg_dict).reset_index()

        df_agg_ = df.groupby(['window_start']).agg(**agg_dict).reset_index()

        #example: agg_dict = {'min_latency_E2E': ('kpi_value', 'min')

        #given that threshold has 1 value
        second_value_tuple = next(iter(agg_dict.values()))[1]
        #in case we have multiple thresholds!
        #second_values_tuples = [value[1] for value in agg_dict.values()]
        if second_value_tuple=="min":
            df_agg = df_agg_.min(numeric_only=True).to_frame().T
        elif second_value_tuple == "max":
            df_agg = df_agg_.max(numeric_only=True).to_frame().T
        elif second_value_tuple == "std":
            df_agg = df_agg_.sted(numeric_only=True).to_frame().T
        else:
            df_agg = df_agg_.mean(numeric_only=True).to_frame().T

        # Assign the first value of window_start from the original aggregated data
        df_agg['window_start'] = df_agg_['window_start'].iloc[0]

        # Reorder columns to place 'window_start' first if needed
        cols = ['window_start'] + [col for col in df_agg.columns if col != 'window_start']
        df_agg = df_agg[cols]

    except Exception as e:
        LOGGER.error(f"Aggregation error: {e}")
        return []

    # Apply thresholds


    df_thresholded = ApplyThresholds(df_agg, thresholds)
    df_thresholded['kpi_id'] = key
    df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
    # Convert aggregated DataFrame to list of dicts
    result = df_thresholded.to_dict(orient='records')
    LOGGER.info(f"Processed batch with {len(result)} records after aggregation and thresholding.")

    return result

def produce_result(result, producer, destination_topic):
@@ -197,7 +225,7 @@ def DaskStreamer(key, kpi_list, thresholds, stop_event,
                continue

            try:
                message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce',unit='s')
                message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce')
                LOGGER.warning(f"message_timestamp: {message_timestamp}. Skipping message.")

                if pd.isna(message_timestamp):