From 2183218975578c96f380cf69d6b5aea017dba41c Mon Sep 17 00:00:00 2001
From: Waleed Akbar <wakbar@cttc.es>
Date: Thu, 5 Sep 2024 10:29:44 +0000
Subject: [PATCH] Changes in Analytics Backend.

- In the BackendService.py:
     + Updated SparkStreamer call with new parameters.
- In SparkStreaming.py:
   + Added 'GetAggregerations' and 'ApplyThresholds' methods
   + Added 'window_size', 'win_slide_duration', 'time_stamp_col' and 'thresholds' parameters.
- Added new messages.
- Updated the 'RunSparkStreamer' call wth new parameters.
---
 .../service/AnalyticsBackendService.py        |  9 +-
 .../backend/service/SparkStreaming.py         | 90 +++++++++++++------
 src/analytics/backend/tests/messages.py       | 19 ++++
 src/analytics/backend/tests/test_backend.py   | 23 ++---
 4 files changed, 101 insertions(+), 40 deletions(-)

diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py
index 5331d027d..2842e2374 100755
--- a/src/analytics/backend/service/AnalyticsBackendService.py
+++ b/src/analytics/backend/service/AnalyticsBackendService.py
@@ -33,8 +33,13 @@ class AnalyticsBackendService(GenericGrpcService):
                                             'group.id'           : 'analytics-frontend',
                                             'auto.offset.reset'  : 'latest'})
 
-    def RunSparkStreamer(self, kpi_list):
-        threading.Thread(target=SparkStreamer, args=(kpi_list,)).start()
+    def RunSparkStreamer(self, kpi_list, oper_list, thresholds_dict):
+        print ("Received parameters: {:} - {:} - {:}".format(kpi_list, oper_list, thresholds_dict))
+        LOGGER.debug ("Received parameters: {:} - {:} - {:}".format(kpi_list, oper_list, thresholds_dict))
+        threading.Thread(target=SparkStreamer, 
+                         args=(kpi_list, oper_list, None, None, thresholds_dict, None)
+                         ).start()
+        return True
 
     def RunRequestListener(self)->bool:
         threading.Thread(target=self.RequestListener).start()
diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py
index 245a77d80..26d3c26d8 100644
--- a/src/analytics/backend/service/SparkStreaming.py
+++ b/src/analytics/backend/service/SparkStreaming.py
@@ -15,8 +15,8 @@
 
 import logging
 from pyspark.sql                  import SparkSession
-from pyspark.sql.types            import StructType, StructField, StringType, DoubleType
-from pyspark.sql.functions        import from_json, col
+from pyspark.sql.types            import StructType, StructField, StringType, DoubleType, TimestampType
+from pyspark.sql.functions        import from_json, col, window, avg, min, max, first, last, stddev, when, round
 from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
 
 LOGGER = logging.getLogger(__name__)
@@ -44,22 +44,50 @@ def DefiningRequestSchema():
             StructField("kpi_value"  ,  DoubleType()  , True)
         ])
 
-def SettingKafkaProducerParams():
-    return {
-            "kafka.bootstrap.servers" : KafkaConfig.get_kafka_address(),
-            "topic"                   : KafkaTopic.ANALYTICS_RESPONSE.value
+def get_aggregations(oper_list):
+    # Define the possible aggregation functions
+    agg_functions = {
+        'avg'  :  round(avg    ("kpi_value"), 3) .alias("avg_value"),
+        'min'  :  round(min    ("kpi_value"), 3) .alias("min_value"),
+        'max'  :  round(max    ("kpi_value"), 3) .alias("max_value"),
+        'first':  round(first  ("kpi_value"), 3) .alias("first_value"),
+        'last' :  round(last   ("kpi_value"), 3) .alias("last_value"),
+        'stdev':  round(stddev ("kpi_value"), 3) .alias("stdev_value")
     }
+    return [agg_functions[op] for op in oper_list if op in agg_functions]   # Filter and return only the selected aggregations
+
+def apply_thresholds(aggregated_df, thresholds):
+    # Apply thresholds (TH-Fail and TH-RAISE) based on the thresholds dictionary on the aggregated DataFrame.
+    
+    # Loop through each column name and its associated thresholds
+    for col_name, (fail_th, raise_th) in thresholds.items():
+        # Apply TH-Fail condition (if column value is less than the fail threshold)
+        aggregated_df = aggregated_df.withColumn(
+            f"{col_name}_THRESHOLD_FAIL", 
+            when(col(col_name) < fail_th, True).otherwise(False)
+        )
+        # Apply TH-RAISE condition (if column value is greater than the raise threshold)
+        aggregated_df = aggregated_df.withColumn(
+            f"{col_name}_THRESHOLD_RAISE", 
+            when(col(col_name) > raise_th, True).otherwise(False)
+        )
+    return aggregated_df
 
-def SparkStreamer(kpi_list):
+def SparkStreamer(kpi_list, oper_list, window_size=None, win_slide_duration=None, thresholds=None, time_stamp_col=None):
     """
     Method to perform Spark operation Kafka stream.
     NOTE: Kafka topic to be processesd should have atleast one row before initiating the spark session. 
     """
-    kafka_producer_params = SettingKafkaConsumerParams()         # Define the Kafka producer parameters
     kafka_consumer_params = SettingKafkaConsumerParams()         # Define the Kafka consumer parameters
     schema                = DefiningRequestSchema()              # Define the schema for the incoming JSON data
     spark                 = DefiningSparkSession()               # Define the spark session with app name and spark version
-
+    
+    # extra options default assignment
+    if window_size        is None: window_size        = "60 seconds"    # default
+    if win_slide_duration is None: win_slide_duration = "30 seconds"    # default
+    if time_stamp_col     is None: time_stamp_col     = "time_stamp"    # default
+    if thresholds         is None: thresholds         = {}              # No threshold will be applied
+    
     try:
         # Read data from Kafka
         raw_stream_data = spark \
@@ -74,36 +102,42 @@ def SparkStreamer(kpi_list):
         parsed_stream_data   = stream_data.withColumn("parsed_value", from_json(col("value"), schema))
         # Select the parsed fields
         final_stream_data    = parsed_stream_data.select("parsed_value.*")
+        # Convert the time_stamp to proper timestamp (assuming it's in ISO format)
+        final_stream_data    = final_stream_data.withColumn(time_stamp_col, col(time_stamp_col).cast(TimestampType()))
         # Filter the stream to only include rows where the kpi_id is in the kpi_list
         filtered_stream_data = final_stream_data.filter(col("kpi_id").isin(kpi_list))
+         # Define a window for aggregation
+        windowed_stream_data = filtered_stream_data \
+                                .groupBy(
+                                    window( col(time_stamp_col), 
+                                           window_size, slideDuration=win_slide_duration
+                                           ),
+                                    col("kpi_id")
+                                ) \
+                                .agg(*get_aggregations(oper_list))
+        # Apply thresholds to the aggregated data
+        thresholded_stream_data = apply_thresholds(windowed_stream_data, thresholds)
+
+        # --- This will write output on console: FOR TESTING PURPOSES
+        # Start the Spark streaming query
+        # query = thresholded_stream_data \
+        #     .writeStream \
+        #     .outputMode("update") \
+        #     .format("console") 
 
-        query = filtered_stream_data \
+        # --- This will write output to Kafka: ACTUAL IMPLEMENTATION
+        query = thresholded_stream_data \
             .selectExpr("CAST(kpi_id AS STRING) AS key", "to_json(struct(*)) AS value") \
             .writeStream \
             .format("kafka") \
             .option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \
             .option("topic",                   KafkaTopic.ANALYTICS_RESPONSE.value) \
-            .option("checkpointLocation",      "/home/tfs/sparkcheckpoint") \
-            .outputMode("append")
-
-        # Start the Spark streaming query and write the output to the Kafka topic
-        # query = filtered_stream_data \
-        #     .selectExpr("CAST(kpi_id AS STRING) AS key", "to_json(struct(*)) AS value") \
-        #     .writeStream \
-        #     .format("kafka") \
-        #     .option(**kafka_producer_params) \
-        #     .option("checkpointLocation", "sparkcheckpoint") \
-        #     .outputMode("append") \
-        #     .start()
-
-        # Start the Spark streaming query
-        # query = filtered_stream_data \
-        #     .writeStream \
-        #     .outputMode("append") \
-        #     .format("console")              # You can change this to other output modes or sinks
+            .option("checkpointLocation",      "analytics/.spark/checkpoint") \
+            .outputMode("update")
 
         # Start the query execution
         query.start().awaitTermination()
+
     except Exception as e:
         print("Error in Spark streaming process: {:}".format(e))
         LOGGER.debug("Error in Spark streaming process: {:}".format(e))
diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py
index 5cf553eaa..c4a26a1ac 100644
--- a/src/analytics/backend/tests/messages.py
+++ b/src/analytics/backend/tests/messages.py
@@ -13,3 +13,22 @@
 # limitations under the License.
 
 
+def get_kpi_id_list():
+    return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"]
+
+def get_operation_list():
+    return [ 'avg', 'max' ]     # possibilities ['avg', 'min', 'max', 'first', 'last', 'stdev']
+
+def get_threshold_dict():
+    threshold_dict = {
+        'avg_value'    : (20, 30),
+        'min_value'    : (00, 10), 
+        'max_value'    : (45, 50),
+        'first_value'  : (00, 10),
+        'last_value'   : (40, 50),
+        'stddev_value' : (00, 10),
+    }
+    # Filter threshold_dict based on the operation_list
+    return {
+        op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict
+    }
diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py
index 426c89e54..9e8a0832d 100644
--- a/src/analytics/backend/tests/test_backend.py
+++ b/src/analytics/backend/tests/test_backend.py
@@ -15,7 +15,7 @@
 import logging
 from common.tools.kafka.Variables import KafkaTopic
 from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
-
+from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict
 
 LOGGER = logging.getLogger(__name__)
 
@@ -25,10 +25,10 @@ LOGGER = logging.getLogger(__name__)
 ###########################
 
 # --- "test_validate_kafka_topics" should be run before the functionality tests ---
-# def test_validate_kafka_topics():
-#     LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
-#     response = KafkaTopic.create_all_topics()
-#     assert isinstance(response, bool)
+def test_validate_kafka_topics():
+    LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
+    response = KafkaTopic.create_all_topics()
+    assert isinstance(response, bool)
 
 def test_RunRequestListener():
     LOGGER.info('test_RunRequestListener')
@@ -37,8 +37,11 @@ def test_RunRequestListener():
     LOGGER.debug(str(response))
 
 
-# def test_SparkListener():
-#     LOGGER.info('test_RunRequestListener')
-#     AnalyticsBackendServiceObj = AnalyticsBackendService()
-#     response = AnalyticsBackendServiceObj.RunSparkStreamer()
-#     LOGGER.debug(str(response))
+def test_SparkListener():
+    LOGGER.info('test_RunRequestListener')
+    AnalyticsBackendServiceObj = AnalyticsBackendService()
+    response = AnalyticsBackendServiceObj.RunSparkStreamer(
+        get_kpi_id_list(), get_operation_list(), get_threshold_dict()
+        )
+    LOGGER.debug(str(response))
+    assert isinstance(response, bool)
-- 
GitLab