diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py
index f42618a1ca5f9e52290f12a1f1eacaec480b293b..245a77d80a5e3729f273ce927bdf7d2acd8f2964 100644
--- a/src/analytics/backend/service/SparkStreaming.py
+++ b/src/analytics/backend/service/SparkStreaming.py
@@ -14,9 +14,9 @@
 
 
 import logging
-from pyspark.sql import SparkSession
-from pyspark.sql.types import StructType, StructField, StringType, DoubleType
-from pyspark.sql.functions import from_json, col
+from pyspark.sql                  import SparkSession
+from pyspark.sql.types            import StructType, StructField, StringType, DoubleType
+from pyspark.sql.functions        import from_json, col
 from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
 
 LOGGER = logging.getLogger(__name__)
@@ -28,7 +28,7 @@ def DefiningSparkSession():
             .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
             .getOrCreate()
 
-def SettingKafkaParameters():   # TODO:  create get_kafka_consumer() in common with inputs (bootstrap server, subscribe, startingOffset and failOnDataLoss with default values)
+def SettingKafkaConsumerParams():   # TODO:  create get_kafka_consumer() in common with inputs (bootstrap server, subscribe, startingOffset and failOnDataLoss with default values)
     return {
             # "kafka.bootstrap.servers": '127.0.0.1:9092',
             "kafka.bootstrap.servers": KafkaConfig.get_kafka_address(),
@@ -44,37 +44,63 @@ def DefiningRequestSchema():
             StructField("kpi_value"  ,  DoubleType()  , True)
         ])
 
+def SettingKafkaProducerParams():
+    return {
+            "kafka.bootstrap.servers" : KafkaConfig.get_kafka_address(),
+            "topic"                   : KafkaTopic.ANALYTICS_RESPONSE.value
+    }
+
 def SparkStreamer(kpi_list):
     """
     Method to perform Spark operation Kafka stream.
     NOTE: Kafka topic to be processesd should have atleast one row before initiating the spark session. 
     """
-    kafka_params = SettingKafkaParameters()         # Define the Kafka parameters
-    schema       = DefiningRequestSchema()          # Define the schema for the incoming JSON data
-    spark        = DefiningSparkSession()           # Define the spark session with app name and spark version
+    kafka_producer_params = SettingKafkaConsumerParams()         # Define the Kafka producer parameters
+    kafka_consumer_params = SettingKafkaConsumerParams()         # Define the Kafka consumer parameters
+    schema                = DefiningRequestSchema()              # Define the schema for the incoming JSON data
+    spark                 = DefiningSparkSession()               # Define the spark session with app name and spark version
 
     try:
         # Read data from Kafka
         raw_stream_data = spark \
             .readStream \
             .format("kafka") \
-            .options(**kafka_params) \
+            .options(**kafka_consumer_params) \
             .load()
 
         # Convert the value column from Kafka to a string
-        stream_data        = raw_stream_data.selectExpr("CAST(value AS STRING)")
+        stream_data          = raw_stream_data.selectExpr("CAST(value AS STRING)")
         # Parse the JSON string into a DataFrame with the defined schema
-        parsed_stream_data = stream_data.withColumn("parsed_value", from_json(col("value"), schema))
+        parsed_stream_data   = stream_data.withColumn("parsed_value", from_json(col("value"), schema))
         # Select the parsed fields
-        final_stream_data  = parsed_stream_data.select("parsed_value.*")
+        final_stream_data    = parsed_stream_data.select("parsed_value.*")
         # Filter the stream to only include rows where the kpi_id is in the kpi_list
         filtered_stream_data = final_stream_data.filter(col("kpi_id").isin(kpi_list))
 
-        # Start the Spark streaming query
         query = filtered_stream_data \
+            .selectExpr("CAST(kpi_id AS STRING) AS key", "to_json(struct(*)) AS value") \
             .writeStream \
-            .outputMode("append") \
-            .format("console")              # You can change this to other output modes or sinks
+            .format("kafka") \
+            .option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \
+            .option("topic",                   KafkaTopic.ANALYTICS_RESPONSE.value) \
+            .option("checkpointLocation",      "/home/tfs/sparkcheckpoint") \
+            .outputMode("append")
+
+        # Start the Spark streaming query and write the output to the Kafka topic
+        # query = filtered_stream_data \
+        #     .selectExpr("CAST(kpi_id AS STRING) AS key", "to_json(struct(*)) AS value") \
+        #     .writeStream \
+        #     .format("kafka") \
+        #     .option(**kafka_producer_params) \
+        #     .option("checkpointLocation", "sparkcheckpoint") \
+        #     .outputMode("append") \
+        #     .start()
+
+        # Start the Spark streaming query
+        # query = filtered_stream_data \
+        #     .writeStream \
+        #     .outputMode("append") \
+        #     .format("console")              # You can change this to other output modes or sinks
 
         # Start the query execution
         query.start().awaitTermination()