From 88d7a20007565a66f7561ec9e9a7811db1c9b0fe Mon Sep 17 00:00:00 2001
From: Waleed Akbar <wakbar@cttc.es>
Date: Sat, 7 Sep 2024 05:56:22 +0000
Subject: [PATCH] Changes in Analytics Service.

- Thresholds, window_size, window_slider is added in Frontend and Backend.
---
 .../service/AnalyticsBackendService.py        | 24 ++++++++++++-------
 .../backend/service/SparkStreaming.py         |  3 ++-
 src/analytics/backend/tests/messages.py       |  2 +-
 .../AnalyticsFrontendServiceServicerImpl.py   | 12 ++++++----
 src/analytics/frontend/tests/messages.py      |  8 +++----
 5 files changed, 31 insertions(+), 18 deletions(-)

diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py
index 2842e2374..4784ef051 100755
--- a/src/analytics/backend/service/AnalyticsBackendService.py
+++ b/src/analytics/backend/service/AnalyticsBackendService.py
@@ -22,7 +22,7 @@ from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
 from confluent_kafka import Consumer as KafkaConsumer
 from confluent_kafka import KafkaError
 
-LOGGER             = logging.getLogger(__name__)
+LOGGER = logging.getLogger(__name__)
 
 class AnalyticsBackendService(GenericGrpcService):
     """
@@ -33,11 +33,18 @@ class AnalyticsBackendService(GenericGrpcService):
                                             'group.id'           : 'analytics-frontend',
                                             'auto.offset.reset'  : 'latest'})
 
-    def RunSparkStreamer(self, kpi_list, oper_list, thresholds_dict):
-        print ("Received parameters: {:} - {:} - {:}".format(kpi_list, oper_list, thresholds_dict))
-        LOGGER.debug ("Received parameters: {:} - {:} - {:}".format(kpi_list, oper_list, thresholds_dict))
+    def RunSparkStreamer(self, analyzer):
+        kpi_list      = analyzer['input_kpis'] 
+        oper_list     = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())]
+        thresholds    = analyzer['thresholds']
+        window_size   = analyzer['window_size']
+        window_slider = analyzer['window_slider']
+        print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
+            kpi_list, oper_list, thresholds, window_size, window_slider))
+        LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
+            kpi_list, oper_list, thresholds, window_size, window_slider))
         threading.Thread(target=SparkStreamer, 
-                         args=(kpi_list, oper_list, None, None, thresholds_dict, None)
+                         args=(kpi_list, oper_list, thresholds, window_size, window_slider, None)
                          ).start()
         return True
 
@@ -63,6 +70,7 @@ class AnalyticsBackendService(GenericGrpcService):
                     break
             analyzer    = json.loads(receive_msg.value().decode('utf-8'))
             analyzer_id = receive_msg.key().decode('utf-8')
-            LOGGER.debug('Recevied Collector: {:} - {:}'.format(analyzer_id, analyzer))
-            print('Recevied Collector: {:} - {:} - {:}'.format(analyzer_id, analyzer, analyzer['input_kpis']))
-            self.RunSparkStreamer(analyzer['input_kpis'])                   # TODO: Add active analyzer to list
+            LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_id, analyzer))
+            print('Recevied Analyzer: {:} - {:}'.format(analyzer_id, analyzer))
+            # TODO: Add active analyzer to list
+            self.RunSparkStreamer(analyzer)
diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py
index 175222b59..11ec9fe5f 100644
--- a/src/analytics/backend/service/SparkStreaming.py
+++ b/src/analytics/backend/service/SparkStreaming.py
@@ -73,7 +73,8 @@ def ApplyThresholds(aggregated_df, thresholds):
         )
     return aggregated_df
 
-def SparkStreamer(kpi_list, oper_list, thresholds, window_size=None, win_slide_duration=None, time_stamp_col=None):
+def SparkStreamer(kpi_list, oper_list, thresholds, 
+                  window_size=None, win_slide_duration=None, time_stamp_col=None):
     """
     Method to perform Spark operation Kafka stream.
     NOTE: Kafka topic to be processesd should have atleast one row before initiating the spark session. 
diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py
index c4a26a1ac..9acd6ad9d 100644
--- a/src/analytics/backend/tests/messages.py
+++ b/src/analytics/backend/tests/messages.py
@@ -26,7 +26,7 @@ def get_threshold_dict():
         'max_value'    : (45, 50),
         'first_value'  : (00, 10),
         'last_value'   : (40, 50),
-        'stddev_value' : (00, 10),
+        'stdev_value'  : (00, 10),
     }
     # Filter threshold_dict based on the operation_list
     return {
diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py
index 0f9f4e146..2671bfb13 100644
--- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py
+++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py
@@ -64,10 +64,14 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
         """
         analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
         analyzer_to_generate : Dict = {
-            "algo_name"   : analyzer_obj.algorithm_name,
-            "input_kpis"  : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
-            "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
-            "oper_mode"   : analyzer_obj.operation_mode
+            "algo_name"       : analyzer_obj.algorithm_name,
+            "input_kpis"      : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
+            "output_kpis"     : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
+            "oper_mode"       : analyzer_obj.operation_mode,
+            "thresholds"      : json.loads(analyzer_obj.parameters["thresholds"]),
+            "window_size"     : analyzer_obj.parameters["window_size"],
+            "window_slider"   : analyzer_obj.parameters["window_slider"],
+            # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] 
         }
         self.kafka_producer.produce(
             KafkaTopic.ANALYTICS_REQUEST.value,
diff --git a/src/analytics/frontend/tests/messages.py b/src/analytics/frontend/tests/messages.py
index 4ffbb0b8e..180fac1f8 100644
--- a/src/analytics/frontend/tests/messages.py
+++ b/src/analytics/frontend/tests/messages.py
@@ -33,10 +33,10 @@ def create_analyzer():
     _kpi_id = KpiId()
     # input IDs to analyze
     _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
-    _kpi_id.kpi_id.uuid              = "1e22f180-ba28-4641-b190-2287bf446666"
+    _kpi_id.kpi_id.uuid              = "6e22f180-ba28-4641-b190-2287bf448888"
     _create_analyzer.input_kpi_ids.append(_kpi_id)
     _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
-    _kpi_id.kpi_id.uuid              = "6e22f180-ba28-4641-b190-2287bf448888"
+    _kpi_id.kpi_id.uuid              = "1e22f180-ba28-4641-b190-2287bf446666"
     _create_analyzer.input_kpi_ids.append(_kpi_id)
     _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
     _create_analyzer.input_kpi_ids.append(_kpi_id)
@@ -47,8 +47,8 @@ def create_analyzer():
     _create_analyzer.output_kpi_ids.append(_kpi_id)
     # parameter
     _threshold_dict = {
-        'avg_value'   :(20, 30), 'min_value'   :(00, 10), 'max_value'   :(45, 50),
-        'first_value' :(00, 10), 'last_value'  :(40, 50), 'stddev_value':(00, 10)}
+        # 'avg_value'   :(20, 30), 'min_value'   :(00, 10), 'max_value'   :(45, 50),
+        'first_value' :(00, 10), 'last_value'  :(40, 50), 'stdev_value':(00, 10)}
     _create_analyzer.parameters['thresholds']      = json.dumps(_threshold_dict)
     _create_analyzer.parameters['window_size']     = "60 seconds"     # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" 
     _create_analyzer.parameters['window_slider']   = "30 seconds"     # should be less than window size
-- 
GitLab