diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index f9fcf47ec6c704308d49ff03483a44e096c68320..899525fc6e3e89cf40409ace155d91a03437f4d1 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -47,10 +47,11 @@ class AnalyticsBackendService(GenericGrpcService): try: stop_event = threading.Event() thread = threading.Thread(target=SparkStreamer, - args=(kpi_list, oper_list, thresholds, window_size, window_slider, None, - stop_event)) + args=(kpi_list, oper_list, thresholds, stop_event, + window_size, window_slider, None )) self.running_threads[analyzer_id] = (thread, stop_event) - # thread.start() + thread.start() + print ("Initiated Analyzer backend: {:}".format(analyzer_id)) LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_id)) return True except Exception as e: @@ -99,10 +100,9 @@ class AnalyticsBackendService(GenericGrpcService): LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) return True except Exception as e: - LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: ".format(analyzer_uuid, e)) + LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: {:}".format(analyzer_uuid, e)) return False else: - print ("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) + print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid)) # LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) # generate confirmation towards frontend - diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py index 11ec9fe5f9e3224d74f788f51bc45287515b098d..202076ed52f61dae1ac397fa162d3517919f2ce8 100644 --- a/src/analytics/backend/service/SparkStreaming.py +++ b/src/analytics/backend/service/SparkStreaming.py @@ -13,7 +13,7 @@ # limitations under the License. -import logging +import logging, time from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType from pyspark.sql.functions import from_json, col, window, avg, min, max, first, last, stddev, when, round @@ -25,6 +25,7 @@ def DefiningSparkSession(): # Create a Spark session with specific spark verions (3.5.0) return SparkSession.builder \ .appName("Analytics") \ + .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \ .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \ .getOrCreate() @@ -73,7 +74,7 @@ def ApplyThresholds(aggregated_df, thresholds): ) return aggregated_df -def SparkStreamer(kpi_list, oper_list, thresholds, +def SparkStreamer(kpi_list, oper_list, thresholds, stop_event, window_size=None, win_slide_duration=None, time_stamp_col=None): """ Method to perform Spark operation Kafka stream. @@ -136,10 +137,17 @@ def SparkStreamer(kpi_list, oper_list, thresholds, .outputMode("update") # Start the query execution - query.start().awaitTermination() + queryHandler = query.start() + + # Loop to check for stop event flag. To be set by stop collector method. + while True: + if stop_event.is_set(): + print ("Stop Event activated. Terminating in 5 seconds...") + time.sleep(5) + queryHandler.stop() + break + time.sleep(5) except Exception as e: print("Error in Spark streaming process: {:}".format(e)) LOGGER.debug("Error in Spark streaming process: {:}".format(e)) - finally: - spark.stop()