From d6dc91c284e370c3aa64fd854bb0120bd2ca565f Mon Sep 17 00:00:00 2001
From: Waleed Akbar <wakbar@cttc.es>
Date: Sat, 7 Sep 2024 16:48:20 +0000
Subject: [PATCH] Changes in Analytics Backend

- Updated the position of the `stop_event` parameter.
- Added confirmation for pySpark checkpoint deletion.
- Added a PySpark termination script to handle the `StopCollector` event.
---
 .../backend/service/AnalyticsBackendService.py | 12 ++++++------
 .../backend/service/SparkStreaming.py          | 18 +++++++++++++-----
 2 files changed, 19 insertions(+), 11 deletions(-)

diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py
index f9fcf47ec..899525fc6 100755
--- a/src/analytics/backend/service/AnalyticsBackendService.py
+++ b/src/analytics/backend/service/AnalyticsBackendService.py
@@ -47,10 +47,11 @@ class AnalyticsBackendService(GenericGrpcService):
         try:
             stop_event = threading.Event()
             thread = threading.Thread(target=SparkStreamer, 
-                            args=(kpi_list, oper_list, thresholds, window_size, window_slider, None, 
-                                stop_event))
+                            args=(kpi_list, oper_list, thresholds, stop_event,
+                                  window_size, window_slider, None ))
             self.running_threads[analyzer_id] = (thread, stop_event)
-            # thread.start()
+            thread.start()
+            print      ("Initiated Analyzer backend: {:}".format(analyzer_id))
             LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_id))
             return True
         except Exception as e:
@@ -99,10 +100,9 @@ class AnalyticsBackendService(GenericGrpcService):
                 LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
                 return True
             except Exception as e:
-                LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: ".format(analyzer_uuid, e))
+                LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: {:}".format(analyzer_uuid, e))
                 return False
         else:
-            print         ("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))
+            print         ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid))
             # LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))           
             # generate confirmation towards frontend
-
diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py
index 11ec9fe5f..202076ed5 100644
--- a/src/analytics/backend/service/SparkStreaming.py
+++ b/src/analytics/backend/service/SparkStreaming.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 
-import logging
+import logging, time
 from pyspark.sql                  import SparkSession
 from pyspark.sql.types            import StructType, StructField, StringType, DoubleType, TimestampType
 from pyspark.sql.functions        import from_json, col, window, avg, min, max, first, last, stddev, when, round
@@ -25,6 +25,7 @@ def DefiningSparkSession():
     # Create a Spark session with specific spark verions (3.5.0)
     return SparkSession.builder \
             .appName("Analytics") \
+            .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
             .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
             .getOrCreate()
 
@@ -73,7 +74,7 @@ def ApplyThresholds(aggregated_df, thresholds):
         )
     return aggregated_df
 
-def SparkStreamer(kpi_list, oper_list, thresholds, 
+def SparkStreamer(kpi_list, oper_list, thresholds, stop_event,
                   window_size=None, win_slide_duration=None, time_stamp_col=None):
     """
     Method to perform Spark operation Kafka stream.
@@ -136,10 +137,17 @@ def SparkStreamer(kpi_list, oper_list, thresholds,
             .outputMode("update")
 
         # Start the query execution
-        query.start().awaitTermination()
+        queryHandler = query.start()
+
+        # Loop to check for stop event flag. To be set by stop collector method.
+        while True:
+            if stop_event.is_set():
+                print ("Stop Event activated. Terminating in 5 seconds...")
+                time.sleep(5)
+                queryHandler.stop()
+                break
+            time.sleep(5)
 
     except Exception as e:
         print("Error in Spark streaming process: {:}".format(e))
         LOGGER.debug("Error in Spark streaming process: {:}".format(e))
-    finally:
-        spark.stop()
-- 
GitLab