Skip to content
Snippets Groups Projects
Commit 21832189 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes in Analytics Backend.

- In the BackendService.py:
     + Updated SparkStreamer call with new parameters.
- In SparkStreaming.py:
   + Added 'GetAggregerations' and 'ApplyThresholds' methods
   + Added 'window_size', 'win_slide_duration', 'time_stamp_col' and 'thresholds' parameters.
- Added new messages.
- Updated the 'RunSparkStreamer' call wth new parameters.
parent e7e36f7d
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!261(CTTC) New Analytics Component
......@@ -33,8 +33,13 @@ class AnalyticsBackendService(GenericGrpcService):
'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'})
def RunSparkStreamer(self, kpi_list):
threading.Thread(target=SparkStreamer, args=(kpi_list,)).start()
def RunSparkStreamer(self, kpi_list, oper_list, thresholds_dict):
print ("Received parameters: {:} - {:} - {:}".format(kpi_list, oper_list, thresholds_dict))
LOGGER.debug ("Received parameters: {:} - {:} - {:}".format(kpi_list, oper_list, thresholds_dict))
threading.Thread(target=SparkStreamer,
args=(kpi_list, oper_list, None, None, thresholds_dict, None)
).start()
return True
def RunRequestListener(self)->bool:
threading.Thread(target=self.RequestListener).start()
......
......@@ -15,8 +15,8 @@
import logging
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import from_json, col, window, avg, min, max, first, last, stddev, when, round
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
LOGGER = logging.getLogger(__name__)
......@@ -44,22 +44,50 @@ def DefiningRequestSchema():
StructField("kpi_value" , DoubleType() , True)
])
def SettingKafkaProducerParams():
return {
"kafka.bootstrap.servers" : KafkaConfig.get_kafka_address(),
"topic" : KafkaTopic.ANALYTICS_RESPONSE.value
def get_aggregations(oper_list):
# Define the possible aggregation functions
agg_functions = {
'avg' : round(avg ("kpi_value"), 3) .alias("avg_value"),
'min' : round(min ("kpi_value"), 3) .alias("min_value"),
'max' : round(max ("kpi_value"), 3) .alias("max_value"),
'first': round(first ("kpi_value"), 3) .alias("first_value"),
'last' : round(last ("kpi_value"), 3) .alias("last_value"),
'stdev': round(stddev ("kpi_value"), 3) .alias("stdev_value")
}
return [agg_functions[op] for op in oper_list if op in agg_functions] # Filter and return only the selected aggregations
def apply_thresholds(aggregated_df, thresholds):
# Apply thresholds (TH-Fail and TH-RAISE) based on the thresholds dictionary on the aggregated DataFrame.
# Loop through each column name and its associated thresholds
for col_name, (fail_th, raise_th) in thresholds.items():
# Apply TH-Fail condition (if column value is less than the fail threshold)
aggregated_df = aggregated_df.withColumn(
f"{col_name}_THRESHOLD_FAIL",
when(col(col_name) < fail_th, True).otherwise(False)
)
# Apply TH-RAISE condition (if column value is greater than the raise threshold)
aggregated_df = aggregated_df.withColumn(
f"{col_name}_THRESHOLD_RAISE",
when(col(col_name) > raise_th, True).otherwise(False)
)
return aggregated_df
def SparkStreamer(kpi_list):
def SparkStreamer(kpi_list, oper_list, window_size=None, win_slide_duration=None, thresholds=None, time_stamp_col=None):
"""
Method to perform Spark operation Kafka stream.
NOTE: Kafka topic to be processesd should have atleast one row before initiating the spark session.
"""
kafka_producer_params = SettingKafkaConsumerParams() # Define the Kafka producer parameters
kafka_consumer_params = SettingKafkaConsumerParams() # Define the Kafka consumer parameters
schema = DefiningRequestSchema() # Define the schema for the incoming JSON data
spark = DefiningSparkSession() # Define the spark session with app name and spark version
# extra options default assignment
if window_size is None: window_size = "60 seconds" # default
if win_slide_duration is None: win_slide_duration = "30 seconds" # default
if time_stamp_col is None: time_stamp_col = "time_stamp" # default
if thresholds is None: thresholds = {} # No threshold will be applied
try:
# Read data from Kafka
raw_stream_data = spark \
......@@ -74,36 +102,42 @@ def SparkStreamer(kpi_list):
parsed_stream_data = stream_data.withColumn("parsed_value", from_json(col("value"), schema))
# Select the parsed fields
final_stream_data = parsed_stream_data.select("parsed_value.*")
# Convert the time_stamp to proper timestamp (assuming it's in ISO format)
final_stream_data = final_stream_data.withColumn(time_stamp_col, col(time_stamp_col).cast(TimestampType()))
# Filter the stream to only include rows where the kpi_id is in the kpi_list
filtered_stream_data = final_stream_data.filter(col("kpi_id").isin(kpi_list))
# Define a window for aggregation
windowed_stream_data = filtered_stream_data \
.groupBy(
window( col(time_stamp_col),
window_size, slideDuration=win_slide_duration
),
col("kpi_id")
) \
.agg(*get_aggregations(oper_list))
# Apply thresholds to the aggregated data
thresholded_stream_data = apply_thresholds(windowed_stream_data, thresholds)
# --- This will write output on console: FOR TESTING PURPOSES
# Start the Spark streaming query
# query = thresholded_stream_data \
# .writeStream \
# .outputMode("update") \
# .format("console")
query = filtered_stream_data \
# --- This will write output to Kafka: ACTUAL IMPLEMENTATION
query = thresholded_stream_data \
.selectExpr("CAST(kpi_id AS STRING) AS key", "to_json(struct(*)) AS value") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \
.option("topic", KafkaTopic.ANALYTICS_RESPONSE.value) \
.option("checkpointLocation", "/home/tfs/sparkcheckpoint") \
.outputMode("append")
# Start the Spark streaming query and write the output to the Kafka topic
# query = filtered_stream_data \
# .selectExpr("CAST(kpi_id AS STRING) AS key", "to_json(struct(*)) AS value") \
# .writeStream \
# .format("kafka") \
# .option(**kafka_producer_params) \
# .option("checkpointLocation", "sparkcheckpoint") \
# .outputMode("append") \
# .start()
# Start the Spark streaming query
# query = filtered_stream_data \
# .writeStream \
# .outputMode("append") \
# .format("console") # You can change this to other output modes or sinks
.option("checkpointLocation", "analytics/.spark/checkpoint") \
.outputMode("update")
# Start the query execution
query.start().awaitTermination()
except Exception as e:
print("Error in Spark streaming process: {:}".format(e))
LOGGER.debug("Error in Spark streaming process: {:}".format(e))
......
......@@ -13,3 +13,22 @@
# limitations under the License.
def get_kpi_id_list():
return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"]
def get_operation_list():
return [ 'avg', 'max' ] # possibilities ['avg', 'min', 'max', 'first', 'last', 'stdev']
def get_threshold_dict():
threshold_dict = {
'avg_value' : (20, 30),
'min_value' : (00, 10),
'max_value' : (45, 50),
'first_value' : (00, 10),
'last_value' : (40, 50),
'stddev_value' : (00, 10),
}
# Filter threshold_dict based on the operation_list
return {
op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict
}
......@@ -15,7 +15,7 @@
import logging
from common.tools.kafka.Variables import KafkaTopic
from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict
LOGGER = logging.getLogger(__name__)
......@@ -25,10 +25,10 @@ LOGGER = logging.getLogger(__name__)
###########################
# --- "test_validate_kafka_topics" should be run before the functionality tests ---
# def test_validate_kafka_topics():
# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
# response = KafkaTopic.create_all_topics()
# assert isinstance(response, bool)
def test_validate_kafka_topics():
LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
def test_RunRequestListener():
LOGGER.info('test_RunRequestListener')
......@@ -37,8 +37,11 @@ def test_RunRequestListener():
LOGGER.debug(str(response))
# def test_SparkListener():
# LOGGER.info('test_RunRequestListener')
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.RunSparkStreamer()
# LOGGER.debug(str(response))
def test_SparkListener():
LOGGER.info('test_RunRequestListener')
AnalyticsBackendServiceObj = AnalyticsBackendService()
response = AnalyticsBackendServiceObj.RunSparkStreamer(
get_kpi_id_list(), get_operation_list(), get_threshold_dict()
)
LOGGER.debug(str(response))
assert isinstance(response, bool)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment