From 8ac130c10dade4d58a1de43382d046a203a80af5 Mon Sep 17 00:00:00 2001
From: Waleed Akbar <wakbar@cttc.es>
Date: Mon, 9 Sep 2024 11:38:19 +0000
Subject: [PATCH] Changes in Analytics

Proto:
- Added `Duration_s` field to the proto file.

Frontend:
- Added `SelectAnalyzer` logic.
- Improved message formatting in the `create_analyzer_filter()` function.
- Added a test case: `test_SelectAnalytics`.

Backend:
- Renamed the `RunSparkStreamer` method to `StartSparkStreamer`.
- Updated the `StartRequestListener` method to return `(thread, stop_event)`.
- Added a `StopRequestListener` method to stop the listener.

Database:
- Added the `select_with_filter` method with actual logic implementation.
- Updated the `ConvertRowToAnalyzer` method to correctly read the `operation_mode` ENUM value.
---
 proto/analytics_frontend.proto                | 18 +++++-----
 .../service/AnalyticsBackendService.py        | 34 ++++++++++++++-----
 .../backend/service/SparkStreaming.py         |  3 +-
 src/analytics/backend/tests/test_backend.py   | 21 ++++++++++--
 src/analytics/database/AnalyzerModel.py       |  2 +-
 src/analytics/database/Analyzer_DB.py         | 19 +++++++++--
 .../AnalyticsFrontendServiceServicerImpl.py   | 19 ++++++++---
 src/analytics/frontend/tests/messages.py      | 25 +++++++-------
 src/analytics/frontend/tests/test_frontend.py | 21 ++++++++----
 9 files changed, 115 insertions(+), 47 deletions(-)

diff --git a/proto/analytics_frontend.proto b/proto/analytics_frontend.proto
index bc7420d54..ace0581db 100644
--- a/proto/analytics_frontend.proto
+++ b/proto/analytics_frontend.proto
@@ -35,18 +35,20 @@ enum AnalyzerOperationMode {
   ANALYZEROPERATIONMODE_STREAMING   = 2;
 }
 
+// duration field may be added in analyzer... 
 message Analyzer {
   AnalyzerId                 analyzer_id          = 1;
   string                     algorithm_name       = 2;  // The algorithm to be executed
-  repeated kpi_manager.KpiId input_kpi_ids        = 3;  // The KPI Ids to be processed by the analyzer
-  repeated kpi_manager.KpiId output_kpi_ids       = 4;  // The KPI Ids produced by the analyzer
-  AnalyzerOperationMode      operation_mode       = 5;  // Operation mode of the analyzer
-  map<string, string>        parameters           = 6; 
+  float                      duration_s           = 3;  // Termiate the data analytics thread after duration (seconds); 0 = infinity time
+  repeated kpi_manager.KpiId input_kpi_ids        = 4;  // The KPI Ids to be processed by the analyzer
+  repeated kpi_manager.KpiId output_kpi_ids       = 5;  // The KPI Ids produced by the analyzer
+  AnalyzerOperationMode      operation_mode       = 6;  // Operation mode of the analyzer
+  map<string, string>        parameters           = 7;  // Add dictionary of (key, value) pairs such as (window_size, 10) etc.
   // In batch mode... 
-  float                      batch_min_duration_s = 7;  // ..., min duration to collect before executing batch
-  float                      batch_max_duration_s = 8;  // ..., max duration collected to execute the batch
-  uint64                     batch_min_size       = 9;  // ..., min number of samples to collect before executing batch
-  uint64                     batch_max_size       = 10; // ..., max number of samples collected to execute the batch
+  float                      batch_min_duration_s = 8;  // ..., min duration to collect before executing batch
+  float                      batch_max_duration_s = 9;  // ..., max duration collected to execute the batch
+  uint64                     batch_min_size       = 10; // ..., min number of samples to collect before executing batch
+  uint64                     batch_max_size       = 11; // ..., max number of samples collected to execute the batch
 }
 
 message AnalyzerFilter {
diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py
index 899525fc6..463442f82 100755
--- a/src/analytics/backend/service/AnalyticsBackendService.py
+++ b/src/analytics/backend/service/AnalyticsBackendService.py
@@ -34,7 +34,7 @@ class AnalyticsBackendService(GenericGrpcService):
                                             'group.id'           : 'analytics-frontend',
                                             'auto.offset.reset'  : 'latest'})
 
-    def RunSparkStreamer(self, analyzer_id, analyzer):
+    def StartSparkStreamer(self, analyzer_id, analyzer):
         kpi_list      = analyzer['input_kpis'] 
         oper_list     = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())]  # TODO: update this line...
         thresholds    = analyzer['thresholds']
@@ -59,17 +59,33 @@ class AnalyticsBackendService(GenericGrpcService):
             LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
             return False
 
-    def RunRequestListener(self)->bool:
-        threading.Thread(target=self.RequestListener).start()
-        return True
+    def StopRequestListener(self, threadInfo: tuple):
+        try:
+            thread, stop_event = threadInfo
+            stop_event.set()
+            thread.join()
+            print      ("Terminating Analytics backend RequestListener")
+            LOGGER.info("Terminating Analytics backend RequestListener")
+            return True
+        except Exception as e:
+            print       ("Failed to terminate analytics backend {:}".format(e))
+            LOGGER.error("Failed to terminate analytics backend {:}".format(e))
+            return False
+
+    def StartRequestListener(self)->tuple:
+        stop_event = threading.Event()
+        thread = threading.Thread(target=self.RequestListener,
+                                  args=(stop_event,) )
+        thread.start()
+        return (thread, stop_event)
 
-    def RequestListener(self):
+    def RequestListener(self, stop_event):
         """
         listener for requests on Kafka topic.
         """
         consumer = self.kafka_consumer
         consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
-        while True:
+        while not stop_event.is_set():
             receive_msg = consumer.poll(2.0)
             if receive_msg is None:
                 continue
@@ -87,7 +103,9 @@ class AnalyticsBackendService(GenericGrpcService):
             if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
                 self.TerminateAnalyzerBackend(analyzer_uuid)
             else:
-                self.RunSparkStreamer(analyzer_uuid, analyzer)
+                self.StartSparkStreamer(analyzer_uuid, analyzer)
+        LOGGER.debug("Stop Event activated. Terminating...")
+        print       ("Stop Event activated. Terminating...")
 
     def TerminateAnalyzerBackend(self, analyzer_uuid):
         if analyzer_uuid in self.running_threads:
@@ -104,5 +122,5 @@ class AnalyticsBackendService(GenericGrpcService):
                 return False
         else:
             print         ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid))
-            # LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))           
+            LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))           
             # generate confirmation towards frontend
diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py
index 202076ed5..eaabcfed2 100644
--- a/src/analytics/backend/service/SparkStreaming.py
+++ b/src/analytics/backend/service/SparkStreaming.py
@@ -142,7 +142,8 @@ def SparkStreamer(kpi_list, oper_list, thresholds, stop_event,
         # Loop to check for stop event flag. To be set by stop collector method.
         while True:
             if stop_event.is_set():
-                print ("Stop Event activated. Terminating in 5 seconds...")
+                LOGGER.debug("Stop Event activated. Terminating in 5 seconds...")
+                print       ("Stop Event activated. Terminating in 5 seconds...")
                 time.sleep(5)
                 queryHandler.stop()
                 break
diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py
index 9e8a0832d..c3e00df35 100644
--- a/src/analytics/backend/tests/test_backend.py
+++ b/src/analytics/backend/tests/test_backend.py
@@ -12,7 +12,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import time
 import logging
+import threading
 from common.tools.kafka.Variables import KafkaTopic
 from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
 from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict
@@ -30,12 +32,25 @@ def test_validate_kafka_topics():
     response = KafkaTopic.create_all_topics()
     assert isinstance(response, bool)
 
-def test_RunRequestListener():
+def test_StartRequestListener():
     LOGGER.info('test_RunRequestListener')
     AnalyticsBackendServiceObj = AnalyticsBackendService()
-    response = AnalyticsBackendServiceObj.RunRequestListener()
-    LOGGER.debug(str(response))
+    response = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
+    LOGGER.debug(str(response)) 
+    assert isinstance(response, tuple)
 
+def test_StopRequestListener():
+    LOGGER.info('test_RunRequestListener')
+    LOGGER.info('Initiating StartRequestListener...')
+    AnalyticsBackendServiceObj = AnalyticsBackendService()
+    response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
+    # LOGGER.debug(str(response_thread))
+    time.sleep(10)
+    LOGGER.info('Initiating StopRequestListener...')
+    AnalyticsBackendServiceObj = AnalyticsBackendService()
+    response = AnalyticsBackendServiceObj.StopRequestListener(response_thread)
+    LOGGER.debug(str(response)) 
+    assert isinstance(response, bool)
 
 def test_SparkListener():
     LOGGER.info('test_RunRequestListener')
diff --git a/src/analytics/database/AnalyzerModel.py b/src/analytics/database/AnalyzerModel.py
index 783205269..c33e396e0 100644
--- a/src/analytics/database/AnalyzerModel.py
+++ b/src/analytics/database/AnalyzerModel.py
@@ -87,7 +87,7 @@ class Analyzer(Base):
         response                              = analytics_frontend_pb2.Analyzer()
         response.analyzer_id.analyzer_id.uuid = row.analyzer_id
         response.algorithm_name               = row.algorithm_name
-        response.operation_mode               = row.operation_mode
+        response.operation_mode               = row.operation_mode.value
         response.parameters.update(row.parameters)
         
         for input_kpi_id in row.input_kpi_ids:
diff --git a/src/analytics/database/Analyzer_DB.py b/src/analytics/database/Analyzer_DB.py
index 896ba1100..1ba68989a 100644
--- a/src/analytics/database/Analyzer_DB.py
+++ b/src/analytics/database/Analyzer_DB.py
@@ -15,7 +15,7 @@
 import logging
 import sqlalchemy_utils
 
-from sqlalchemy     import inspect
+from sqlalchemy     import inspect, or_
 from sqlalchemy.orm import sessionmaker
 
 from analytics.database.AnalyzerModel         import Analyzer as AnalyzerModel
@@ -120,9 +120,22 @@ class AnalyzerDB:
         session = self.Session()
         try:
             query = session.query(AnalyzerModel)
+            
             # Apply filters based on the filter_object
-            if filter_object.kpi_id:
-                query = query.filter(AnalyzerModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id]))  # Need to be updated
+            if filter_object.analyzer_id:
+                query = query.filter(AnalyzerModel.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id]))
+
+            if filter_object.algorithm_names:
+                query = query.filter(AnalyzerModel.algorithm_name.in_(filter_object.algorithm_names))
+
+            if filter_object.input_kpi_ids:
+                input_kpi_uuids = [k.kpi_id.uuid for k in filter_object.input_kpi_ids]
+                query = query.filter(AnalyzerModel.input_kpi_ids.op('&&')(input_kpi_uuids))
+
+            if filter_object.output_kpi_ids:
+                output_kpi_uuids = [k.kpi_id.uuid for k in filter_object.output_kpi_ids]
+                query = query.filter(AnalyzerModel.output_kpi_ids.op('&&')(output_kpi_uuids))
+
             result = query.all()
             # query should be added to return all rows
             if result:
diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py
index 9c438761c..f35f035e2 100644
--- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py
+++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py
@@ -126,12 +126,23 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
 
     @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
     def SelectAnalyzers(self, 
-                         request : AnalyzerFilter, contextgrpc_context: grpc.ServicerContext # type: ignore
+                         filter : AnalyzerFilter, contextgrpc_context: grpc.ServicerContext # type: ignore
                         ) -> AnalyzerList:  # type: ignore
-        LOGGER.info("At Service gRPC message: {:}".format(request))
+        LOGGER.info("At Service gRPC message: {:}".format(filter))
         response = AnalyzerList()
-
-        return response
+        try:
+            rows = self.db_obj.select_with_filter(AnalyzerModel, filter)
+            try:
+                for row in rows:
+                    response.analyzer_list.append(
+                        AnalyzerModel.ConvertRowToAnalyzer(row)
+                    )
+                return response
+            except Exception as e:
+                LOGGER.info('Unable to process filter response {:}'.format(e))
+        except Exception as e:
+            LOGGER.error('Unable to apply filter on table {:}. ERROR: {:}'.format(AnalyzerModel.__name__, e))
+       
 
     def delivery_callback(self, err, msg):
         if err:
diff --git a/src/analytics/frontend/tests/messages.py b/src/analytics/frontend/tests/messages.py
index 180fac1f8..646de962e 100644
--- a/src/analytics/frontend/tests/messages.py
+++ b/src/analytics/frontend/tests/messages.py
@@ -21,12 +21,13 @@ from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, Analyze
 def create_analyzer_id():
     _create_analyzer_id                  = AnalyzerId()
     # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
-    _create_analyzer_id.analyzer_id.uuid = "9baa306d-d91c-48c2-a92f-76a21bab35b6"
+    _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
     return _create_analyzer_id
 
 def create_analyzer():
     _create_analyzer                              = Analyzer()
-    _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
+    # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
+    _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
     _create_analyzer.algorithm_name               = "Test_Aggergate_and_Threshold"
     _create_analyzer.operation_mode               = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
     
@@ -60,24 +61,24 @@ def create_analyzer_filter():
     _create_analyzer_filter           = AnalyzerFilter()
 
     _analyzer_id_obj                  = AnalyzerId()
-    _analyzer_id_obj.analyzer_id.uuid = str(uuid.uuid4())
+    # _analyzer_id_obj.analyzer_id.uuid = str(uuid.uuid4())
+    _analyzer_id_obj.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
     _create_analyzer_filter.analyzer_id.append(_analyzer_id_obj)
 
-    _create_analyzer_filter.algorithm_names.append('Algorithum1')
+    _create_analyzer_filter.algorithm_names.append('Test_Aggergate_and_Threshold')
 
-    _input_kpi_id_obj                 = KpiId()
-    _input_kpi_id_obj.kpi_id.uuid     = str(uuid.uuid4())
-    _create_analyzer_filter.input_kpi_ids.append(_input_kpi_id_obj)
+    # _input_kpi_id_obj                 = KpiId()
+    # _input_kpi_id_obj.kpi_id.uuid     = str(uuid.uuid4())
+    # _create_analyzer_filter.input_kpi_ids.append(_input_kpi_id_obj)
     # another input kpi Id
     # _input_kpi_id_obj.kpi_id.uuid     = str(uuid.uuid4())
     # _create_analyzer_filter.input_kpi_ids.append(_input_kpi_id_obj)
 
-    _output_kpi_id_obj                = KpiId()
-    _output_kpi_id_obj.kpi_id.uuid    = str(uuid.uuid4())
-    _create_analyzer_filter.output_kpi_ids.append(_output_kpi_id_obj)
-    # another output kpi Id
+    # _output_kpi_id_obj                = KpiId()
+    # _output_kpi_id_obj.kpi_id.uuid    = str(uuid.uuid4())
+    # _create_analyzer_filter.output_kpi_ids.append(_output_kpi_id_obj)
+    # # another output kpi Id
     # _output_kpi_id_obj.kpi_id.uuid     = str(uuid.uuid4())
     # _create_analyzer_filter.input_kpi_ids.append(_output_kpi_id_obj)
 
     return _create_analyzer_filter
-
diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py
index df6ce165e..b96116d29 100644
--- a/src/analytics/frontend/tests/test_frontend.py
+++ b/src/analytics/frontend/tests/test_frontend.py
@@ -21,10 +21,11 @@ from common.proto.context_pb2 import Empty
 from common.Settings          import ( get_service_port_grpc, get_env_var_name, 
                                       ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC )
 
+from common.tools.kafka.Variables                        import KafkaTopic
 from common.proto.analytics_frontend_pb2                 import AnalyzerId, AnalyzerList
 from analytics.frontend.client.AnalyticsFrontendClient   import AnalyticsFrontendClient
 from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService
-from analytics.frontend.tests.messages                    import ( create_analyzer_id, create_analyzer,
+from analytics.frontend.tests.messages                   import ( create_analyzer_id, create_analyzer,
                                                                  create_analyzer_filter )
 
 ###########################
@@ -75,6 +76,12 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic
 # Tests Implementation of Analytics Frontend
 ###########################
 
+# --- "test_validate_kafka_topics" should be executed before the functionality tests ---
+def test_validate_kafka_topics():
+    LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
+    response = KafkaTopic.create_all_topics()
+    assert isinstance(response, bool)
+
 # ----- core funtionality test -----
 def test_StartAnalytics(analyticsFrontend_client):
     LOGGER.info(' >>> test_StartAnalytic START: <<< ')
@@ -82,14 +89,14 @@ def test_StartAnalytics(analyticsFrontend_client):
     LOGGER.debug(str(response))
     assert isinstance(response, AnalyzerId)
 
+def test_SelectAnalytics(analyticsFrontend_client):
+    LOGGER.info(' >>> test_SelectAnalytics START: <<< ')
+    response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter())
+    LOGGER.debug(str(response))
+    assert isinstance(response, AnalyzerList)
+
 def test_StopAnalytic(analyticsFrontend_client):
     LOGGER.info(' >>> test_StopAnalytic START: <<< ')
     response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id())
     LOGGER.debug(str(response))
     assert isinstance(response, Empty)
-
-def test_SelectAnalytics(analyticsFrontend_client):
-    LOGGER.info(' >>> test_SelectAnalytics START: <<< ')
-    response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter())
-    LOGGER.debug(str(response))
-    assert isinstance(response, AnalyzerList)
\ No newline at end of file
-- 
GitLab