Skip to content
Snippets Groups Projects
Commit d6dc91c2 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes in Analytics Backend

- Updated the position of the `stop_event` parameter.
- Added confirmation for pySpark checkpoint deletion.
- Added a PySpark termination script to handle the `StopCollector` event.
parent 9e226625
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!261(CTTC) New Analytics Component
...@@ -47,10 +47,11 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -47,10 +47,11 @@ class AnalyticsBackendService(GenericGrpcService):
try: try:
stop_event = threading.Event() stop_event = threading.Event()
thread = threading.Thread(target=SparkStreamer, thread = threading.Thread(target=SparkStreamer,
args=(kpi_list, oper_list, thresholds, window_size, window_slider, None, args=(kpi_list, oper_list, thresholds, stop_event,
stop_event)) window_size, window_slider, None ))
self.running_threads[analyzer_id] = (thread, stop_event) self.running_threads[analyzer_id] = (thread, stop_event)
# thread.start() thread.start()
print ("Initiated Analyzer backend: {:}".format(analyzer_id))
LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_id)) LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_id))
return True return True
except Exception as e: except Exception as e:
...@@ -99,10 +100,9 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -99,10 +100,9 @@ class AnalyticsBackendService(GenericGrpcService):
LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
return True return True
except Exception as e: except Exception as e:
LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: ".format(analyzer_uuid, e)) LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: {:}".format(analyzer_uuid, e))
return False return False
else: else:
print ("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid))
# LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) # LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))
# generate confirmation towards frontend # generate confirmation towards frontend
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
# limitations under the License. # limitations under the License.
import logging import logging, time
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import from_json, col, window, avg, min, max, first, last, stddev, when, round from pyspark.sql.functions import from_json, col, window, avg, min, max, first, last, stddev, when, round
...@@ -25,6 +25,7 @@ def DefiningSparkSession(): ...@@ -25,6 +25,7 @@ def DefiningSparkSession():
# Create a Spark session with specific spark verions (3.5.0) # Create a Spark session with specific spark verions (3.5.0)
return SparkSession.builder \ return SparkSession.builder \
.appName("Analytics") \ .appName("Analytics") \
.config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \ .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.getOrCreate() .getOrCreate()
...@@ -73,7 +74,7 @@ def ApplyThresholds(aggregated_df, thresholds): ...@@ -73,7 +74,7 @@ def ApplyThresholds(aggregated_df, thresholds):
) )
return aggregated_df return aggregated_df
def SparkStreamer(kpi_list, oper_list, thresholds, def SparkStreamer(kpi_list, oper_list, thresholds, stop_event,
window_size=None, win_slide_duration=None, time_stamp_col=None): window_size=None, win_slide_duration=None, time_stamp_col=None):
""" """
Method to perform Spark operation Kafka stream. Method to perform Spark operation Kafka stream.
...@@ -136,10 +137,17 @@ def SparkStreamer(kpi_list, oper_list, thresholds, ...@@ -136,10 +137,17 @@ def SparkStreamer(kpi_list, oper_list, thresholds,
.outputMode("update") .outputMode("update")
# Start the query execution # Start the query execution
query.start().awaitTermination() queryHandler = query.start()
# Loop to check for stop event flag. To be set by stop collector method.
while True:
if stop_event.is_set():
print ("Stop Event activated. Terminating in 5 seconds...")
time.sleep(5)
queryHandler.stop()
break
time.sleep(5)
except Exception as e: except Exception as e:
print("Error in Spark streaming process: {:}".format(e)) print("Error in Spark streaming process: {:}".format(e))
LOGGER.debug("Error in Spark streaming process: {:}".format(e)) LOGGER.debug("Error in Spark streaming process: {:}".format(e))
finally:
spark.stop()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment