diff --git a/proto/analytics_frontend.proto b/proto/analytics_frontend.proto index bd28a21bf900554f9c883f9cde8c89dc76357637..bc7420d540e1e2353faf6fe999f8c539693b80fc 100644 --- a/proto/analytics_frontend.proto +++ b/proto/analytics_frontend.proto @@ -30,8 +30,9 @@ message AnalyzerId { } enum AnalyzerOperationMode { - ANALYZEROPERATIONMODE_BATCH = 0; - ANALYZEROPERATIONMODE_STREAMING = 1; + ANALYZEROPERATIONMODE_UNSPECIFIED = 0; + ANALYZEROPERATIONMODE_BATCH = 1; + ANALYZEROPERATIONMODE_STREAMING = 2; } message Analyzer { diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py index 73aa7502596fada37a3881da3f6774e95c70f257..175222b5968cb5406f21d802becb8d871d31a550 100644 --- a/src/analytics/backend/service/SparkStreaming.py +++ b/src/analytics/backend/service/SparkStreaming.py @@ -73,7 +73,7 @@ def ApplyThresholds(aggregated_df, thresholds): ) return aggregated_df -def SparkStreamer(kpi_list, oper_list, window_size=None, win_slide_duration=None, thresholds=None, time_stamp_col=None): +def SparkStreamer(kpi_list, oper_list, thresholds, window_size=None, win_slide_duration=None, time_stamp_col=None): """ Method to perform Spark operation Kafka stream. NOTE: Kafka topic to be processesd should have atleast one row before initiating the spark session. @@ -86,7 +86,6 @@ def SparkStreamer(kpi_list, oper_list, window_size=None, win_slide_duration=None if window_size is None: window_size = "60 seconds" # default if win_slide_duration is None: win_slide_duration = "30 seconds" # default if time_stamp_col is None: time_stamp_col = "time_stamp" # default - if thresholds is None: thresholds = {} # No threshold will be applied try: # Read data from Kafka