Skip to content
Snippets Groups Projects
Commit faa6c684 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes in Analytics Backend.

- Updated function names.
parent 21832189
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!261(CTTC) New Analytics Component
...@@ -44,7 +44,7 @@ def DefiningRequestSchema(): ...@@ -44,7 +44,7 @@ def DefiningRequestSchema():
StructField("kpi_value" , DoubleType() , True) StructField("kpi_value" , DoubleType() , True)
]) ])
def get_aggregations(oper_list): def GetAggregations(oper_list):
# Define the possible aggregation functions # Define the possible aggregation functions
agg_functions = { agg_functions = {
'avg' : round(avg ("kpi_value"), 3) .alias("avg_value"), 'avg' : round(avg ("kpi_value"), 3) .alias("avg_value"),
...@@ -56,7 +56,7 @@ def get_aggregations(oper_list): ...@@ -56,7 +56,7 @@ def get_aggregations(oper_list):
} }
return [agg_functions[op] for op in oper_list if op in agg_functions] # Filter and return only the selected aggregations return [agg_functions[op] for op in oper_list if op in agg_functions] # Filter and return only the selected aggregations
def apply_thresholds(aggregated_df, thresholds): def ApplyThresholds(aggregated_df, thresholds):
# Apply thresholds (TH-Fail and TH-RAISE) based on the thresholds dictionary on the aggregated DataFrame. # Apply thresholds (TH-Fail and TH-RAISE) based on the thresholds dictionary on the aggregated DataFrame.
# Loop through each column name and its associated thresholds # Loop through each column name and its associated thresholds
...@@ -114,9 +114,9 @@ def SparkStreamer(kpi_list, oper_list, window_size=None, win_slide_duration=None ...@@ -114,9 +114,9 @@ def SparkStreamer(kpi_list, oper_list, window_size=None, win_slide_duration=None
), ),
col("kpi_id") col("kpi_id")
) \ ) \
.agg(*get_aggregations(oper_list)) .agg(*GetAggregations(oper_list))
# Apply thresholds to the aggregated data # Apply thresholds to the aggregated data
thresholded_stream_data = apply_thresholds(windowed_stream_data, thresholds) thresholded_stream_data = ApplyThresholds(windowed_stream_data, thresholds)
# --- This will write output on console: FOR TESTING PURPOSES # --- This will write output on console: FOR TESTING PURPOSES
# Start the Spark streaming query # Start the Spark streaming query
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment