diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py index 26d3c26d878795eb5a07cbf228f2e15d02d4b500..73aa7502596fada37a3881da3f6774e95c70f257 100644 --- a/src/analytics/backend/service/SparkStreaming.py +++ b/src/analytics/backend/service/SparkStreaming.py @@ -44,7 +44,7 @@ def DefiningRequestSchema(): StructField("kpi_value" , DoubleType() , True) ]) -def get_aggregations(oper_list): +def GetAggregations(oper_list): # Define the possible aggregation functions agg_functions = { 'avg' : round(avg ("kpi_value"), 3) .alias("avg_value"), @@ -56,7 +56,7 @@ def get_aggregations(oper_list): } return [agg_functions[op] for op in oper_list if op in agg_functions] # Filter and return only the selected aggregations -def apply_thresholds(aggregated_df, thresholds): +def ApplyThresholds(aggregated_df, thresholds): # Apply thresholds (TH-Fail and TH-RAISE) based on the thresholds dictionary on the aggregated DataFrame. # Loop through each column name and its associated thresholds @@ -114,9 +114,9 @@ def SparkStreamer(kpi_list, oper_list, window_size=None, win_slide_duration=None ), col("kpi_id") ) \ - .agg(*get_aggregations(oper_list)) + .agg(*GetAggregations(oper_list)) # Apply thresholds to the aggregated data - thresholded_stream_data = apply_thresholds(windowed_stream_data, thresholds) + thresholded_stream_data = ApplyThresholds(windowed_stream_data, thresholds) # --- This will write output on console: FOR TESTING PURPOSES # Start the Spark streaming query