From c00595043dab3c664d7b0adfd167efc6a842402f Mon Sep 17 00:00:00 2001
From: Waleed Akbar <wakbar@cttc.es>
Date: Thu, 19 Sep 2024 21:50:14 +0000
Subject: [PATCH] Changes to Analytics Frontend for Managing Stream Response:

- Added `KpiValue` message type in `AnalyticsFrontend.proto`.
- Changed `StartAnalyzer` response type from message to stream.
- Added `StartResponseListener()` call and response handler in the `StartAnalyzer` method.
- Implemented `ConvertValueToKpiValueType` to handle `OneOf` type conversion.
- Added tests to handle stream responses.
---
 proto/analytics_frontend.proto                |  3 +-
 .../AnalyticsFrontendServiceServicerImpl.py   | 36 ++++++++++++++-----
 src/analytics/frontend/tests/test_frontend.py | 17 ++++-----
 3 files changed, 38 insertions(+), 18 deletions(-)

diff --git a/proto/analytics_frontend.proto b/proto/analytics_frontend.proto
index ace0581db..66661bc66 100644
--- a/proto/analytics_frontend.proto
+++ b/proto/analytics_frontend.proto
@@ -17,10 +17,11 @@ package analytics_frontend;
 
 import "context.proto";
 import "kpi_manager.proto";
+import "kpi_value_api.proto";
 //import "kpi_sample_types.proto";
 
 service AnalyticsFrontendService {
-  rpc StartAnalyzer  (Analyzer      ) returns (AnalyzerId   ) {}
+  rpc StartAnalyzer  (Analyzer      ) returns (stream kpi_value_api.KpiValue) {}
   rpc StopAnalyzer   (AnalyzerId    ) returns (context.Empty) {}
   rpc SelectAnalyzers(AnalyzerFilter) returns (AnalyzerList ) {}
 }
diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py
index 8bb6a17af..d9495bc4e 100644
--- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py
+++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py
@@ -15,11 +15,13 @@
 
 import logging, grpc, json, queue
 
+from datetime        import datetime
 from typing          import Dict
 from confluent_kafka import Consumer as KafkaConsumer
 from confluent_kafka import Producer as KafkaProducer
 from confluent_kafka import KafkaError
 
+from common.proto.kpi_value_api_pb2           import KpiValueType, KpiValue
 from common.tools.kafka.Variables             import KafkaConfig, KafkaTopic
 from common.proto.context_pb2                 import Empty
 from common.method_wrappers.Decorator         import MetricsPool, safe_and_metered_rpc_method
@@ -48,17 +50,21 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
     @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
     def StartAnalyzer(self, 
                        request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore
-                      ) -> AnalyzerId: # type: ignore
+                      ) -> KpiValue: # type: ignore
         LOGGER.info ("At Service gRPC message: {:}".format(request))
-        response = AnalyzerId()
+        response = KpiValue()
 
         self.db_obj.add_row_to_db(
             AnalyzerModel.ConvertAnalyzerToRow(request)
         )
         self.PublishStartRequestOnKafka(request)
-        
-        response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid
-        return response
+        for key, value in self.StartResponseListener(request.analyzer_id.analyzer_id.uuid):
+            # LOGGER.debug("Response from ---> {:}, {:}".format(key, value))
+            response.kpi_id.kpi_id.uuid  = value['kpi_id']
+            response.timestamp.timestamp = datetime.strptime(value['time_stamp'], "%Y-%m-%dT%H:%M:%SZ").timestamp()
+            response.kpi_value_type.CopyFrom(self.ConverValueToKpiValueType(value['kpi_value']))
+            yield response
+        # response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid
 
     def PublishStartRequestOnKafka(self, analyzer_obj):
         """
@@ -203,12 +209,24 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
                 LOGGER.info('Unable to process filter response {:}'.format(e))
         except Exception as e:
             LOGGER.error('Unable to apply filter on table {:}. ERROR: {:}'.format(AnalyzerModel.__name__, e))
-       
+
+    def ConverValueToKpiValueType(self, value):
+        kpi_value_type = KpiValueType()
+        if isinstance(value, int):  # Check for integer type
+            kpi_value_type.int32Val = value  # Assuming int32Val for example
+        elif isinstance(value, float):  # Check for float type
+            kpi_value_type.floatVal = value
+        elif isinstance(value, str):  # Check for string type
+            kpi_value_type.stringVal = value
+        elif isinstance(value, bool):  # Check for boolean type
+            kpi_value_type.boolVal = value
+        # Add other checks for different types as needed
+        return kpi_value_type
 
     def delivery_callback(self, err, msg):
         if err:
             LOGGER.debug('Message delivery failed: {:}'.format(err))
             print       ('Message delivery failed: {:}'.format(err))
-        # else:
-        #     LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
-        #     print('Message delivered to topic {:}'.format(msg.topic()))
+        else:
+            LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
+            print('Message delivered to topic {:}'.format(msg.topic()))
diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py
index d2428c01f..4424bab10 100644
--- a/src/analytics/frontend/tests/test_frontend.py
+++ b/src/analytics/frontend/tests/test_frontend.py
@@ -25,7 +25,7 @@ from common.Settings          import ( get_service_port_grpc, get_env_var_name,
                                       ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC )
 
 from common.tools.kafka.Variables                        import KafkaTopic
-from common.proto.analytics_frontend_pb2                 import AnalyzerId, AnalyzerList
+from common.proto.kpi_value_api_pb2                      import KpiValue
 from analytics.frontend.client.AnalyticsFrontendClient   import AnalyticsFrontendClient
 from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService
 from analytics.frontend.tests.messages                   import ( create_analyzer_id, create_analyzer,
@@ -89,12 +89,13 @@ def test_validate_kafka_topics():
     response = KafkaTopic.create_all_topics()
     assert isinstance(response, bool)
 
-# ----- core funtionality test -----
-# def test_StartAnalytics(analyticsFrontend_client):
-#     LOGGER.info(' >>> test_StartAnalytic START: <<< ')
-#     response = analyticsFrontend_client.StartAnalyzer(create_analyzer())
-#     LOGGER.debug(str(response))
-#     assert isinstance(response, AnalyzerId)
+# # ----- core funtionality test -----
+def test_StartAnalytics(analyticsFrontend_client):
+    LOGGER.info(' >>> test_StartAnalytic START: <<< ')
+    stream = analyticsFrontend_client.StartAnalyzer(create_analyzer())
+    for response in stream:
+        LOGGER.debug(str(response))
+    assert isinstance(response, KpiValue)
 
 # To test start and stop listener together
 def test_StartStopAnalyzers(analyticsFrontend_client):
@@ -131,4 +132,4 @@ def test_StartStopAnalyzers(analyticsFrontend_client):
 #         class_obj = AnalyticsFrontendServiceServicerImpl()
 #         for response in class_obj.StartResponseListener(analyzer_id.analyzer_id.uuid):
 #             LOGGER.debug(response)
-#             assert isinstance(response, tuple)
\ No newline at end of file
+#             assert isinstance(response, tuple)
-- 
GitLab