From 1e90b149dbbc5e2bbb044e35954870c77925132c Mon Sep 17 00:00:00 2001
From: Waleed Akbar <wakbar@cttc.es>
Date: Mon, 13 Jan 2025 17:27:16 +0000
Subject: [PATCH] Initial Telemetry backend and new analytics integration test.

---
 .../run_tests_locally-analytics-backend.sh    |  2 +-
 .../run_tests_locally-analytics-frontend.sh   |  2 +-
 .../service/AnalyticsBackendService.py        | 95 ++++++++++---------
 .../backend/service/AnalyzerHandlers.py       |  8 +-
 src/analytics/backend/service/Streamer.py     |  4 +-
 src/analytics/backend/tests/test_backend.py   | 35 +++++--
 6 files changed, 85 insertions(+), 61 deletions(-)

diff --git a/scripts/run_tests_locally-analytics-backend.sh b/scripts/run_tests_locally-analytics-backend.sh
index 78fab0f76..700155a42 100755
--- a/scripts/run_tests_locally-analytics-backend.sh
+++ b/scripts/run_tests_locally-analytics-backend.sh
@@ -24,5 +24,5 @@ export KFK_SERVER_ADDRESS='127.0.0.1:9092'
 CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
 export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require"
 
-python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
+python3 -m pytest --log-level=DEBUG --log-cli-level=INFO --verbose \
     analytics/backend/tests/test_backend.py
diff --git a/scripts/run_tests_locally-analytics-frontend.sh b/scripts/run_tests_locally-analytics-frontend.sh
index 6e945406f..0cb4dc98d 100755
--- a/scripts/run_tests_locally-analytics-frontend.sh
+++ b/scripts/run_tests_locally-analytics-frontend.sh
@@ -21,5 +21,5 @@ RCFILE=$PROJECTDIR/coverage/.coveragerc
 export KFK_SERVER_ADDRESS='127.0.0.1:9092'
 CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
 export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require"
-python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
+python3 -m pytest --log-level=DEBUG --log-cli-level=INFO --verbose \
     analytics/frontend/tests/test_frontend.py
diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py
index 38a305aec..508feecea 100755
--- a/src/analytics/backend/service/AnalyticsBackendService.py
+++ b/src/analytics/backend/service/AnalyticsBackendService.py
@@ -12,10 +12,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import time
 import json
 import logging
 import threading
+
+import pytz
 from common.tools.service.GenericGrpcService import GenericGrpcService
 from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
 from confluent_kafka import Consumer as KafkaConsumer
@@ -23,7 +24,11 @@ from confluent_kafka import KafkaError
 from common.Constants import ServiceNameEnum
 from common.Settings import get_service_port_grpc
 from threading import Thread, Event
-# from .DaskStreaming import DaskStreamer
+from analytics.backend.service.Streamer import DaskStreamer
+from common.proto.analytics_frontend_pb2 import Analyzer
+from apscheduler.schedulers.background import BackgroundScheduler
+from datetime import datetime, timedelta
+
 
 LOGGER = logging.getLogger(__name__)
 
@@ -35,13 +40,18 @@ class AnalyticsBackendService(GenericGrpcService):
         LOGGER.info('Init AnalyticsBackendService')
         port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND)
         super().__init__(port, cls_name=cls_name)
+        self.schedular = BackgroundScheduler(daemon=True)
+        self.schedular.start()
         self.running_threads = {}       # To keep track of all running analyzers 
         self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
                                             'group.id'           : 'analytics-frontend',
                                             'auto.offset.reset'  : 'latest'})
 
     def install_servicers(self):
-        threading.Thread(target=self.RequestListener, args=()).start()
+        threading.Thread(
+            target=self.RequestListener,
+            args=()
+        ).start()
 
     def RequestListener(self):
         """
@@ -69,56 +79,53 @@ class AnalyticsBackendService(GenericGrpcService):
                 # print       ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
 
                 if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
-                    self.StopDaskListener(analyzer_uuid)
+                    self.StopStreamer(analyzer_uuid)
                 else:
-                    self.StartDaskListener(analyzer_uuid, analyzer)
+                    self.StartStreamer(analyzer_uuid, analyzer)
             except Exception as e:
                 LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
-                # print         ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
 
-    def StartDaskListener(self, analyzer_uuid, analyzer):
-        kpi_list      = analyzer[ 'input_kpis'   ] 
-        thresholds    = analyzer[ 'thresholds'   ]
-        window_size   = analyzer[ 'window_size'  ]
-        window_slider = analyzer[ 'window_slider']
 
-        LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:}".format(
-            kpi_list, thresholds, window_size, window_slider))
-        # print        ("Received parameters: {:} - {:} - {:} - {:}".format(
-        #     kpi_list, thresholds, window_size, window_slider))
+    def StartStreamer(self, analyzer_uuid : str, analyzer : json):
+        """
+        Start the DaskStreamer with the given parameters.
+        """
         try:
-            stop_event = Event()
-            thread     = Thread(
-                target=None,  # DaskStreamer,
-                # args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event),
-                args=(analyzer['output_kpis'][0] , kpi_list, thresholds, stop_event),
-                kwargs={
-                    "window_size"       : window_size,
-                }
+            streamer = DaskStreamer(
+                analyzer_uuid,
+                analyzer['input_kpis' ],
+                analyzer['output_kpis'],
+                analyzer['thresholds' ],
+                analyzer['batch_size' ],
+                analyzer['window_size'],
             )
-            thread.start()
-            self.running_threads[analyzer_uuid] = (thread, stop_event)
-            # print      ("Initiated Analyzer backend: {:}".format(analyzer_uuid))
-            LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid))
+            self.schedular.add_job(
+                streamer.run,
+                'date',
+                run_date=datetime.now(pytz.utc),
+                id=analyzer_uuid,
+                replace_existing=True
+            )
+            LOGGER.info("Dask Streamer started.")
             return True
         except Exception as e:
-            # print       ("Failed to initiate Analyzer backend: {:}".format(e))
-            LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
+            LOGGER.error("Failed to start Dask Streamer. ERROR: {:}".format(e))
             return False
 
-    def StopDaskListener(self, analyzer_uuid):
-        if analyzer_uuid in self.running_threads:
-            try:
-                thread, stop_event = self.running_threads[analyzer_uuid]
-                stop_event.set()
-                thread.join()
-                del self.running_threads[analyzer_uuid]
-                # print      ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
-                LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
-                return True
-            except Exception as e:
-                LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: {:}".format(analyzer_uuid, e))
+    def StopStreamer(self, analyzer_uuid : str):
+        """
+        Stop the DaskStreamer with the given analyzer_uuid.
+        """
+        try:
+            active_jobs = self.schedular.get_jobs()
+            logger.debug("Active Jobs: {:}".format(active_jobs))
+            if analyzer_uuid not in [job.id for job in active_jobs]:
+                LOGGER.warning("Dask Streamer not found with the given analyzer_uuid: {:}".format(analyzer_uuid))
                 return False
-        else:
-            # print         ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid))
-            LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))
+            self.schedular.remove_job(analyzer_uuid)
+            LOGGER.info("Dask Streamer stopped.")
+            return True
+        except Exception as e:
+            LOGGER.error("Failed to stop Dask Streamer. ERROR: {:}".format(e))
+            return False
+
diff --git a/src/analytics/backend/service/AnalyzerHandlers.py b/src/analytics/backend/service/AnalyzerHandlers.py
index 34ada434b..f407de2a0 100644
--- a/src/analytics/backend/service/AnalyzerHandlers.py
+++ b/src/analytics/backend/service/AnalyzerHandlers.py
@@ -18,7 +18,7 @@ import pandas as pd
 
 
 logger = logging.getLogger(__name__)
-logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s -  %(levelname)s - %(message)s')
+logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s')
 
 class AnalyzerHandlers(Enum):
     AGGREGATION_HANDLER = "AggregationHandler"
@@ -49,14 +49,14 @@ def threshold_handler(key, aggregated_df, thresholds):
             continue
         
         # Ensure the threshold values are valid (check for tuple specifically)
-        if isinstance(threshold_values, tuple) and len(threshold_values) == 2:
+        if isinstance(threshold_values, list) and len(threshold_values) == 2:
             fail_th, raise_th = threshold_values
             
             # Add threshold columns with updated naming
             aggregated_df[f"{metric_name}_TH_RAISE"] = aggregated_df[metric_name] > raise_th
             aggregated_df[f"{metric_name}_TH_FALL"]  = aggregated_df[metric_name] < fail_th
         else:
-            logger.warning(f"Threshold values for '{metric_name}' are not a tuple of length 2. Skipping threshold application.")
+            logger.warning(f"Threshold values for '{metric_name}' ({threshold_values}) are not a tuple of length 2. Skipping threshold application.")
     return aggregated_df
 
 def aggregation_handler(
@@ -71,7 +71,7 @@ def aggregation_handler(
         logger.info("Empty batch received. Skipping processing.")
         return []
     else:
-        logger.info(f"Processing {len(batch)} records for key: {key}")
+        logger.info(f" >>>>> Processing {len(batch)} records for key: {key}")
         
         # Convert data into a DataFrame
         df = pd.DataFrame(batch)
diff --git a/src/analytics/backend/service/Streamer.py b/src/analytics/backend/service/Streamer.py
index 124ef5651..cabed8588 100644
--- a/src/analytics/backend/service/Streamer.py
+++ b/src/analytics/backend/service/Streamer.py
@@ -24,7 +24,7 @@ from .AnalyzerHelper import AnalyzerHelper
 
 logging.basicConfig(level=logging.INFO)
 logger = logging.getLogger(__name__)
-logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s -  %(levelname)s - %(message)s')
+logging.basicConfig(level=logging.INFO, format=' %(levelname)s - %(message)s')
 
 
 class DaskStreamer:
@@ -59,7 +59,7 @@ class DaskStreamer:
                 if not self.running:
                     logger.warning("Dask Streamer is not running. Exiting loop.")
                     break
-                message = self.consumer.poll(timeout=2.0)
+                message = self.consumer.poll(timeout=2.0)   # Poll for new messages after 2 sceonds
                 if message is None:
                     # logger.info("No new messages received.")
                     continue
diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py
index cc0167399..09be90e4c 100644
--- a/src/analytics/backend/tests/test_backend.py
+++ b/src/analytics/backend/tests/test_backend.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import time
 import pytest
 import logging
 import pandas as pd
@@ -21,19 +22,11 @@ from analytics.backend.service.Streamer import DaskStreamer
 from .messages_analyzer import get_batch, get_input_kpi_list, get_output_kpi_list, get_thresholds, \
                                     get_windows_size, get_batch_size, get_agg_df
 from analytics.backend.service.AnalyzerHandlers import aggregation_handler, threshold_handler
+from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
 
 logger = logging.getLogger(__name__)
 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s -  %(levelname)s - %(message)s')
 
-# --- "test_validate_kafka_topics" should be run before the functionality tests ---
-def test_validate_kafka_topics():
-    logger.debug(" >>> test_validate_kafka_topics: START <<< ")
-    response = KafkaTopic.create_all_topics()
-    assert isinstance(response, bool)
-
-###########################
-# Tests Implementation of Telemetry Backend
-###########################
 
 @pytest.fixture(autouse=True)
 def log_all_methods(request):
@@ -45,6 +38,30 @@ def log_all_methods(request):
     yield
     logger.info(f" <<< Finished test: {request.node.name} <<< ")
 
+
+# --- "test_validate_kafka_topics" should be run before the functionality tests ---
+def test_validate_kafka_topics():
+    logger.debug(" >>> test_validate_kafka_topics: START <<< ")
+    response = KafkaTopic.create_all_topics()
+    assert isinstance(response, bool)
+
+###########################
+# integration test of Streamer with backend service
+###########################
+
+def test_backend_integration_with_analyzer():
+    backendServiceObject = AnalyticsBackendService()
+    backendServiceObject.install_servicers()
+    logger.info(" waiting for 2 minutes for the backend service before termination  ... ")
+    time.sleep(120)
+    backendServiceObject.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666")
+    logger.info(" Backend service terminated successfully ... ")
+
+
+###########################
+# funtionality pytest for analyzer sub methods
+###########################
+
 @pytest.fixture
 def dask_streamer():
     with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client')    as mock_dask_client, \
-- 
GitLab