From faa6c684f08a62da7ab5bc871b7680179961375b Mon Sep 17 00:00:00 2001 From: Waleed Akbar <wakbar@cttc.es> Date: Thu, 5 Sep 2024 10:32:09 +0000 Subject: [PATCH] Changes in Analytics Backend. - Updated function names. --- src/analytics/backend/service/SparkStreaming.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py index 26d3c26d8..73aa75025 100644 --- a/src/analytics/backend/service/SparkStreaming.py +++ b/src/analytics/backend/service/SparkStreaming.py @@ -44,7 +44,7 @@ def DefiningRequestSchema(): StructField("kpi_value" , DoubleType() , True) ]) -def get_aggregations(oper_list): +def GetAggregations(oper_list): # Define the possible aggregation functions agg_functions = { 'avg' : round(avg ("kpi_value"), 3) .alias("avg_value"), @@ -56,7 +56,7 @@ def get_aggregations(oper_list): } return [agg_functions[op] for op in oper_list if op in agg_functions] # Filter and return only the selected aggregations -def apply_thresholds(aggregated_df, thresholds): +def ApplyThresholds(aggregated_df, thresholds): # Apply thresholds (TH-Fail and TH-RAISE) based on the thresholds dictionary on the aggregated DataFrame. # Loop through each column name and its associated thresholds @@ -114,9 +114,9 @@ def SparkStreamer(kpi_list, oper_list, window_size=None, win_slide_duration=None ), col("kpi_id") ) \ - .agg(*get_aggregations(oper_list)) + .agg(*GetAggregations(oper_list)) # Apply thresholds to the aggregated data - thresholded_stream_data = apply_thresholds(windowed_stream_data, thresholds) + thresholded_stream_data = ApplyThresholds(windowed_stream_data, thresholds) # --- This will write output on console: FOR TESTING PURPOSES # Start the Spark streaming query -- GitLab