diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 58700c5a0b5a60c51d02b05150fe643e8ead19dc..fa78a5106ca8b4a4a183fe0e028488e55c3659d2 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -36,7 +36,7 @@ class AnalyticsBackendService(GenericGrpcService): port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND) super().__init__(port, cls_name=cls_name) self.running_threads = {} # To keep track of all running analyzers - self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : '10.152.183.186:9092', + self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'analytics-frontend', 'auto.offset.reset' : 'latest'}) diff --git a/src/analytics/backend/service/DaskStreaming.py b/src/analytics/backend/service/DaskStreaming.py index 3ccb643e3a5740ce89220d26db7871fb07b61a53..cdfe10bd35f68264865d4847c0d292658c330eec 100644 --- a/src/analytics/backend/service/DaskStreaming.py +++ b/src/analytics/backend/service/DaskStreaming.py @@ -43,6 +43,7 @@ def GetAggregationMappings(thresholds): agg_dict[threshold_key] = ('kpi_value', aggregation) return agg_dict + def ApplyThresholds(aggregated_df, thresholds): """ Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary @@ -53,12 +54,14 @@ def ApplyThresholds(aggregated_df, thresholds): """ for threshold_key, threshold_values in thresholds.items(): if threshold_key not in aggregated_df.columns: + LOGGER.warning(f"Threshold key '{threshold_key}' does not correspond to any aggregation result. Skipping threshold application.") continue if isinstance(threshold_values, (list, tuple)) and len(threshold_values) == 2: fail_th, raise_th = threshold_values - aggregated_df[f"{threshold_key}_THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th - aggregated_df[f"{threshold_key}_THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th + aggregated_df["THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th + aggregated_df["THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th + aggregated_df["value"] = aggregated_df[threshold_key] else: LOGGER.warning(f"Threshold values for '{threshold_key}' are not a list or tuple of length 2. Skipping threshold application.") return aggregated_df @@ -96,7 +99,7 @@ def process_batch(batch, agg_mappings, thresholds, key): df = pd.DataFrame(batch) LOGGER.info(f"df {df} ") - df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce',unit='s') + df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce') df.dropna(subset=['time_stamp'], inplace=True) LOGGER.info(f"df {df} ") required_columns = {'time_stamp', 'kpi_id', 'kpi_value'} @@ -110,19 +113,44 @@ def process_batch(batch, agg_mappings, thresholds, key): # Perform aggregations using named aggregation try: agg_dict = {key: value for key, value in agg_mappings.items()} - df_agg = df.groupby(['window_start']).agg(**agg_dict).reset_index() + + df_agg_ = df.groupby(['window_start']).agg(**agg_dict).reset_index() + + #example: agg_dict = {'min_latency_E2E': ('kpi_value', 'min') + + #given that threshold has 1 value + second_value_tuple = next(iter(agg_dict.values()))[1] + #in case we have multiple thresholds! + #second_values_tuples = [value[1] for value in agg_dict.values()] + if second_value_tuple=="min": + df_agg = df_agg_.min(numeric_only=True).to_frame().T + elif second_value_tuple == "max": + df_agg = df_agg_.max(numeric_only=True).to_frame().T + elif second_value_tuple == "std": + df_agg = df_agg_.sted(numeric_only=True).to_frame().T + else: + df_agg = df_agg_.mean(numeric_only=True).to_frame().T + + # Assign the first value of window_start from the original aggregated data + df_agg['window_start'] = df_agg_['window_start'].iloc[0] + + # Reorder columns to place 'window_start' first if needed + cols = ['window_start'] + [col for col in df_agg.columns if col != 'window_start'] + df_agg = df_agg[cols] + except Exception as e: LOGGER.error(f"Aggregation error: {e}") return [] # Apply thresholds + + df_thresholded = ApplyThresholds(df_agg, thresholds) df_thresholded['kpi_id'] = key df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ') # Convert aggregated DataFrame to list of dicts result = df_thresholded.to_dict(orient='records') LOGGER.info(f"Processed batch with {len(result)} records after aggregation and thresholding.") - return result def produce_result(result, producer, destination_topic): @@ -197,7 +225,7 @@ def DaskStreamer(key, kpi_list, thresholds, stop_event, continue try: - message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce',unit='s') + message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce') LOGGER.warning(f"message_timestamp: {message_timestamp}. Skipping message.") if pd.isna(message_timestamp):