From 067ace160dcef4b3d07923bcece19bfef3f50a10 Mon Sep 17 00:00:00 2001
From: Waleed Akbar <wakbar@cttc.es>
Date: Tue, 3 Sep 2024 13:54:24 +0000
Subject: [PATCH] Changes in Telemetry

- Added Kafka Request Listener to listen for requests from the Analytics Frontend.
- Updated `SparkStreamer.py` to consume and process streams from VALUE topics, and further filter rows based on KPIs in the `input_kpis` list.
- Updated the frontend TOPIC from `VALUE` to `ANALYTICS_REQUEST`.
- Updated messages to keep the KPI UUID consistent with the one generated by the Telemetry Backend service.
---
 .../service/AnalyticsBackendService.py        | 42 +++++++++++++++++--
 .../backend/service/SparkStreaming.py         | 19 +++++----
 src/analytics/backend/tests/test_backend.py   | 11 ++++-
 .../AnalyticsFrontendServiceServicerImpl.py   |  4 +-
 src/analytics/frontend/tests/messages.py      |  4 ++
 .../backend/tests/test_TelemetryBackend.py    |  8 ++--
 6 files changed, 67 insertions(+), 21 deletions(-)

diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py
index 84f1887c6..5331d027d 100755
--- a/src/analytics/backend/service/AnalyticsBackendService.py
+++ b/src/analytics/backend/service/AnalyticsBackendService.py
@@ -12,18 +12,52 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import threading
 
+import json
+import logging
+import threading
 from common.tools.service.GenericGrpcService import GenericGrpcService
 from analytics.backend.service.SparkStreaming import SparkStreamer
+from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
+from confluent_kafka import Consumer as KafkaConsumer
+from confluent_kafka import KafkaError
 
+LOGGER             = logging.getLogger(__name__)
 
 class AnalyticsBackendService(GenericGrpcService):
     """
     Class listens for ...
     """
     def __init__(self, cls_name : str = __name__) -> None:
-        pass
+        self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
+                                            'group.id'           : 'analytics-frontend',
+                                            'auto.offset.reset'  : 'latest'})
+
+    def RunSparkStreamer(self, kpi_list):
+        threading.Thread(target=SparkStreamer, args=(kpi_list,)).start()
+
+    def RunRequestListener(self)->bool:
+        threading.Thread(target=self.RequestListener).start()
+        return True
 
-    def RunSparkStreamer(self):
-        threading.Thread(target=SparkStreamer).start()
+    def RequestListener(self):
+        """
+        listener for requests on Kafka topic.
+        """
+        consumer = self.kafka_consumer
+        consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
+        while True:
+            receive_msg = consumer.poll(2.0)
+            if receive_msg is None:
+                continue
+            elif receive_msg.error():
+                if receive_msg.error().code() == KafkaError._PARTITION_EOF:
+                    continue
+                else:
+                    print("Consumer error: {}".format(receive_msg.error()))
+                    break
+            analyzer    = json.loads(receive_msg.value().decode('utf-8'))
+            analyzer_id = receive_msg.key().decode('utf-8')
+            LOGGER.debug('Recevied Collector: {:} - {:}'.format(analyzer_id, analyzer))
+            print('Recevied Collector: {:} - {:} - {:}'.format(analyzer_id, analyzer, analyzer['input_kpis']))
+            self.RunSparkStreamer(analyzer['input_kpis'])                   # TODO: Add active analyzer to list
diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py
index 92cc9a842..f42618a1c 100644
--- a/src/analytics/backend/service/SparkStreaming.py
+++ b/src/analytics/backend/service/SparkStreaming.py
@@ -15,7 +15,7 @@
 
 import logging
 from pyspark.sql import SparkSession
-from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType
+from pyspark.sql.types import StructType, StructField, StringType, DoubleType
 from pyspark.sql.functions import from_json, col
 from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
 
@@ -32,20 +32,19 @@ def SettingKafkaParameters():   # TODO:  create get_kafka_consumer() in common w
     return {
             # "kafka.bootstrap.servers": '127.0.0.1:9092',
             "kafka.bootstrap.servers": KafkaConfig.get_kafka_address(),
-            "subscribe"              : KafkaTopic.ANALYTICS_REQUEST.value,
-            "startingOffsets"        : 'latest',            
+            "subscribe"              : KafkaTopic.VALUE.value,
+            "startingOffsets"        : 'latest',
             "failOnDataLoss"         : 'false'              # Optional: Set to "true" to fail the query on data loss
         }
 
 def DefiningRequestSchema():
     return StructType([
-            StructField("algo_name",   StringType()           , True),
-            StructField("input_kpis",  ArrayType(StringType()), True),
-            StructField("output_kpis", ArrayType(StringType()), True),
-            StructField("oper_mode",   IntegerType()          , True)
+            StructField("time_stamp" ,  StringType()  , True),
+            StructField("kpi_id"     ,  StringType()  , True),
+            StructField("kpi_value"  ,  DoubleType()  , True)
         ])
 
-def SparkStreamer():
+def SparkStreamer(kpi_list):
     """
     Method to perform Spark operation Kafka stream.
     NOTE: Kafka topic to be processesd should have atleast one row before initiating the spark session. 
@@ -68,9 +67,11 @@ def SparkStreamer():
         parsed_stream_data = stream_data.withColumn("parsed_value", from_json(col("value"), schema))
         # Select the parsed fields
         final_stream_data  = parsed_stream_data.select("parsed_value.*")
+        # Filter the stream to only include rows where the kpi_id is in the kpi_list
+        filtered_stream_data = final_stream_data.filter(col("kpi_id").isin(kpi_list))
 
         # Start the Spark streaming query
-        query = final_stream_data \
+        query = filtered_stream_data \
             .writeStream \
             .outputMode("append") \
             .format("console")              # You can change this to other output modes or sinks
diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py
index 7a6175ecf..426c89e54 100644
--- a/src/analytics/backend/tests/test_backend.py
+++ b/src/analytics/backend/tests/test_backend.py
@@ -30,8 +30,15 @@ LOGGER = logging.getLogger(__name__)
 #     response = KafkaTopic.create_all_topics()
 #     assert isinstance(response, bool)
 
-def test_SparkListener():
+def test_RunRequestListener():
     LOGGER.info('test_RunRequestListener')
     AnalyticsBackendServiceObj = AnalyticsBackendService()
-    response = AnalyticsBackendServiceObj.RunSparkStreamer()
+    response = AnalyticsBackendServiceObj.RunRequestListener()
     LOGGER.debug(str(response))
+
+
+# def test_SparkListener():
+#     LOGGER.info('test_RunRequestListener')
+#     AnalyticsBackendServiceObj = AnalyticsBackendService()
+#     response = AnalyticsBackendServiceObj.RunSparkStreamer()
+#     LOGGER.debug(str(response))
diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py
index ccbef3599..0f9f4e146 100644
--- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py
+++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py
@@ -70,7 +70,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
             "oper_mode"   : analyzer_obj.operation_mode
         }
         self.kafka_producer.produce(
-            KafkaTopic.VALUE.value,
+            KafkaTopic.ANALYTICS_REQUEST.value,
             key      = analyzer_uuid,
             value    = json.dumps(analyzer_to_generate),
             callback = self.delivery_callback
@@ -107,7 +107,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
             "oper_mode"   : -1
         }
         self.kafka_producer.produce(
-            KafkaTopic.VALUE.value,
+            KafkaTopic.ANALYTICS_REQUEST.value,
             key      = analyzer_uuid,
             value    = json.dumps(analyzer_to_stop),
             callback = self.delivery_callback
diff --git a/src/analytics/frontend/tests/messages.py b/src/analytics/frontend/tests/messages.py
index 4c826e5c3..0a8300436 100644
--- a/src/analytics/frontend/tests/messages.py
+++ b/src/analytics/frontend/tests/messages.py
@@ -32,6 +32,10 @@ def create_analyzer():
     _kpi_id = KpiId()
     # input IDs to analyze
     _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
+    _kpi_id.kpi_id.uuid              = "1e22f180-ba28-4641-b190-2287bf446666"
+    _create_analyzer.input_kpi_ids.append(_kpi_id)
+    _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
+    _kpi_id.kpi_id.uuid              = "6e22f180-ba28-4641-b190-2287bf448888"
     _create_analyzer.input_kpi_ids.append(_kpi_id)
     _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
     _create_analyzer.input_kpi_ids.append(_kpi_id)
diff --git a/src/telemetry/backend/tests/test_TelemetryBackend.py b/src/telemetry/backend/tests/test_TelemetryBackend.py
index a2bbee540..24a1b35cc 100644
--- a/src/telemetry/backend/tests/test_TelemetryBackend.py
+++ b/src/telemetry/backend/tests/test_TelemetryBackend.py
@@ -25,10 +25,10 @@ LOGGER = logging.getLogger(__name__)
 ###########################
 
 # --- "test_validate_kafka_topics" should be run before the functionality tests ---
-def test_validate_kafka_topics():
-    LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
-    response = KafkaTopic.create_all_topics()
-    assert isinstance(response, bool)
+# def test_validate_kafka_topics():
+#     LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
+#     response = KafkaTopic.create_all_topics()
+#     assert isinstance(response, bool)
 
 def test_RunRequestListener():
     LOGGER.info('test_RunRequestListener')
-- 
GitLab