From c0c517f032299f1879339ea18742c01ab594fb15 Mon Sep 17 00:00:00 2001
From: Waleed Akbar <wakbar@cttc.es>
Date: Sun, 22 Sep 2024 16:33:57 +0000
Subject: [PATCH] Minor Changes in New Monitoring Services to Improve Overall
 Functionality

- Added new tests in the `scripts` folder.
- Improved messages and test files across all services.
Telemetry and Analytics Backend:
- Added a `try` statement to handle Kafka message decoding errors.
Telemetry Frontend and KPI Value API:
- Downgraded `apscheduler` from `3.10.4` to `3.10.1` in the requirements file.
Kafka:
- Added a method to retrieve `KAFKA_SERVER_ADDRESS` from environment variables.
Telemetry Backend:
- Added a condition to allow the collector backend to run indefinitely.
- Changed the extracted `KpiValue` timestamp format to `"%Y-%m-%dT%H:%M:%SZ"` to meet Apache Spark requirements.
- Removed some unused packages from the `requirements.in` file.
Deployment Script:
- Added `"telemetry"` and `"analytics"` component names to the `TFS_COMPONENTS` variable in the `my_deploy.sh` file.
---
 my_deploy.sh                                  |  2 +-
 scripts/run_tests_locally-kpi-DB.sh           |  2 +-
 scripts/run_tests_locally-telemetry-DB.sh     |  3 +-
 .../run_tests_locally-telemetry-backend.sh    |  2 +-
 .../service/AnalyticsBackendService.py        | 31 ++++++------
 src/analytics/backend/tests/messages.py       |  4 +-
 src/analytics/backend/tests/test_backend.py   | 30 +++++------
 src/analytics/frontend/requirements.in        |  2 +-
 src/analytics/frontend/tests/messages.py      |  5 +-
 src/analytics/frontend/tests/test_frontend.py | 50 +++++--------------
 src/common/tools/kafka/Variables.py           | 13 ++---
 src/kpi_manager/tests/test_messages.py        |  2 +
 src/kpi_value_api/requirements.in             |  2 +-
 src/kpi_value_api/tests/test_kpi_value_api.py |  7 +++
 .../service/TelemetryBackendService.py        | 30 ++++++-----
 .../tests/{messagesBackend.py => messages.py} |  0
 ...st_TelemetryBackend.py => test_backend.py} |  3 +-
 src/telemetry/frontend/tests/Messages.py      | 13 +++--
 src/telemetry/frontend/tests/test_frontend.py | 47 ++---------------
 src/telemetry/requirements.in                 |  6 +--
 20 files changed, 106 insertions(+), 148 deletions(-)
 rename src/telemetry/backend/tests/{messagesBackend.py => messages.py} (100%)
 rename src/telemetry/backend/tests/{test_TelemetryBackend.py => test_backend.py} (91%)

diff --git a/my_deploy.sh b/my_deploy.sh
index 88be82b63..5b535a959 100755
--- a/my_deploy.sh
+++ b/my_deploy.sh
@@ -26,7 +26,7 @@ export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_gene
 #export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring"
 
 # Uncomment to activate Monitoring Framework (new)
-#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api"
+#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api telemetry analytics"
 
 # Uncomment to activate BGP-LS Speaker
 #export TFS_COMPONENTS="${TFS_COMPONENTS} bgpls_speaker"
diff --git a/scripts/run_tests_locally-kpi-DB.sh b/scripts/run_tests_locally-kpi-DB.sh
index 4953b49e0..ad1b4c57b 100755
--- a/scripts/run_tests_locally-kpi-DB.sh
+++ b/scripts/run_tests_locally-kpi-DB.sh
@@ -24,7 +24,7 @@ cd $PROJECTDIR/src
 # python3 kpi_manager/tests/test_unitary.py
 
 RCFILE=$PROJECTDIR/coverage/.coveragerc
-CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}')
+CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace ${CRDB_NAMESPACE} -o 'jsonpath={.spec.clusterIP}')
 export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
 python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
     kpi_manager/tests/test_kpi_db.py
diff --git a/scripts/run_tests_locally-telemetry-DB.sh b/scripts/run_tests_locally-telemetry-DB.sh
index 4b9a41760..529774f2d 100755
--- a/scripts/run_tests_locally-telemetry-DB.sh
+++ b/scripts/run_tests_locally-telemetry-DB.sh
@@ -20,7 +20,8 @@ cd $PROJECTDIR/src
 # RCFILE=$PROJECTDIR/coverage/.coveragerc
 # coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
 #     kpi_manager/tests/test_unitary.py
-
+CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}')
+export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
 RCFILE=$PROJECTDIR/coverage/.coveragerc
 python3 -m pytest --log-level=DEBUG --log-cli-level=debug --verbose \
     telemetry/tests/test_telemetryDB.py
diff --git a/scripts/run_tests_locally-telemetry-backend.sh b/scripts/run_tests_locally-telemetry-backend.sh
index 422574d0e..4867335a5 100755
--- a/scripts/run_tests_locally-telemetry-backend.sh
+++ b/scripts/run_tests_locally-telemetry-backend.sh
@@ -26,4 +26,4 @@ CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o js
 export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
 RCFILE=$PROJECTDIR/coverage/.coveragerc
 python3 -m pytest --log-level=INFO --log-cli-level=debug --verbose \
-    telemetry/backend/tests/test_TelemetryBackend.py
+    telemetry/backend/tests/test_backend.py
diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py
index 658d23795..96b1f5242 100755
--- a/src/analytics/backend/service/AnalyticsBackendService.py
+++ b/src/analytics/backend/service/AnalyticsBackendService.py
@@ -46,10 +46,10 @@ class AnalyticsBackendService(GenericGrpcService):
         thresholds    = analyzer['thresholds']
         window_size   = analyzer['window_size']
         window_slider = analyzer['window_slider']
-        print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
-            kpi_list, oper_list, thresholds, window_size, window_slider))
-        LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
-            kpi_list, oper_list, thresholds, window_size, window_slider))
+        # print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
+        #     kpi_list, oper_list, thresholds, window_size, window_slider))
+        # LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
+        #     kpi_list, oper_list, thresholds, window_size, window_slider))
         try:
             stop_event = threading.Event()
             thread = threading.Thread(target=SparkStreamer, 
@@ -86,6 +86,7 @@ class AnalyticsBackendService(GenericGrpcService):
         listener for requests on Kafka topic.
         """
         LOGGER.info("Request Listener is initiated ...")
+        print      ("Request Listener is initiated ...")
         consumer = self.kafka_consumer
         consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
         while True:
@@ -98,17 +99,19 @@ class AnalyticsBackendService(GenericGrpcService):
                 else:
                     print("Consumer error: {}".format(receive_msg.error()))
                     break
-            analyzer      = json.loads(receive_msg.value().decode('utf-8'))
-            analyzer_uuid = receive_msg.key().decode('utf-8')
-            LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
-            print       ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
+            try:
+                analyzer      = json.loads(receive_msg.value().decode('utf-8'))
+                analyzer_uuid = receive_msg.key().decode('utf-8')
+                LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
+                print       ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
 
-            if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
-                self.TerminateAnalyzerBackend(analyzer_uuid)
-            else:
-                self.StartSparkStreamer(analyzer_uuid, analyzer)
-        LOGGER.debug("Stop Event activated. Terminating...")
-        print       ("Stop Event activated. Terminating...")
+                if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
+                    self.TerminateAnalyzerBackend(analyzer_uuid)
+                else:
+                    self.StartSparkStreamer(analyzer_uuid, analyzer)
+            except Exception as e:
+                LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
+                print         ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
 
     def TerminateAnalyzerBackend(self, analyzer_uuid):
         if analyzer_uuid in self.running_threads:
diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py
index c3b78967e..e5faaa1f5 100644
--- a/src/analytics/backend/tests/messages.py
+++ b/src/analytics/backend/tests/messages.py
@@ -42,14 +42,14 @@ def get_threshold_dict():
 def create_analyzer():
     _create_analyzer                              = Analyzer()
     # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
-    _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
+    _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
     _create_analyzer.algorithm_name               = "Test_Aggergate_and_Threshold"
     _create_analyzer.operation_mode               = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
     
     _kpi_id = KpiId()
     # input IDs to analyze
     _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
-    _kpi_id.kpi_id.uuid              = "6e22f180-ba28-4641-b190-2287bf448888"
+    _kpi_id.kpi_id.uuid              = "6e22f180-ba28-4641-b190-2287bf448888" 
     _create_analyzer.input_kpi_ids.append(_kpi_id)
     _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
     _kpi_id.kpi_id.uuid              = "1e22f180-ba28-4641-b190-2287bf446666"
diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py
index 9221bb23e..c2e7fbe31 100644
--- a/src/analytics/backend/tests/test_backend.py
+++ b/src/analytics/backend/tests/test_backend.py
@@ -34,6 +34,7 @@ def test_validate_kafka_topics():
     response = KafkaTopic.create_all_topics()
     assert isinstance(response, bool)
 
+# --- To test Start Streamer functionality ---
 def test_StartSparkStreamer():
     LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ")
     analyzer_obj = create_analyzer()
@@ -52,26 +53,25 @@ def test_StartSparkStreamer():
     response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate)
     assert isinstance(response, bool)
 
+# --- To TEST StartRequestListenerFunctionality  
 # def test_StartRequestListener():
 #     LOGGER.info('test_RunRequestListener')
 #     AnalyticsBackendServiceObj = AnalyticsBackendService()
-#     response = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
-#     LOGGER.debug(str(response)) 
-#     assert isinstance(response, tuple)
+#     threading.Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()).start()
 
 # To test START and STOP communication together
-def test_StopRequestListener():
-    LOGGER.info('test_RunRequestListener')
-    LOGGER.info('Initiating StartRequestListener...')
-    AnalyticsBackendServiceObj = AnalyticsBackendService()
-    response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
-    # LOGGER.debug(str(response_thread))
-    time.sleep(10)
-    LOGGER.info('Initiating StopRequestListener...')
-    AnalyticsBackendServiceObj = AnalyticsBackendService()
-    response = AnalyticsBackendServiceObj.StopRequestListener(response_thread)
-    LOGGER.debug(str(response)) 
-    assert isinstance(response, bool)
+# def test_StopRequestListener():
+#     LOGGER.info('test_RunRequestListener')
+#     LOGGER.info('Initiating StartRequestListener...')
+#     AnalyticsBackendServiceObj = AnalyticsBackendService()
+#     response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
+#     # LOGGER.debug(str(response_thread))
+#     time.sleep(10)
+#     LOGGER.info('Initiating StopRequestListener...')
+#     AnalyticsBackendServiceObj = AnalyticsBackendService()
+#     response = AnalyticsBackendServiceObj.StopRequestListener(response_thread)
+#     LOGGER.debug(str(response)) 
+#     assert isinstance(response, bool)
 
 # To independently tests the SparkListener functionality
 # def test_SparkListener():
diff --git a/src/analytics/frontend/requirements.in b/src/analytics/frontend/requirements.in
index d81b9ddbe..0b1ec921b 100644
--- a/src/analytics/frontend/requirements.in
+++ b/src/analytics/frontend/requirements.in
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-apscheduler==3.10.4
+apscheduler==3.10.1
 confluent-kafka==2.3.*
 psycopg2-binary==2.9.*
 SQLAlchemy==1.4.*
diff --git a/src/analytics/frontend/tests/messages.py b/src/analytics/frontend/tests/messages.py
index 646de962e..eb25c33b0 100644
--- a/src/analytics/frontend/tests/messages.py
+++ b/src/analytics/frontend/tests/messages.py
@@ -21,13 +21,14 @@ from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, Analyze
 def create_analyzer_id():
     _create_analyzer_id                  = AnalyzerId()
     # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
-    _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
+    # _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
+    _create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
     return _create_analyzer_id
 
 def create_analyzer():
     _create_analyzer                              = Analyzer()
     # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
-    _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
+    _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
     _create_analyzer.algorithm_name               = "Test_Aggergate_and_Threshold"
     _create_analyzer.operation_mode               = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
     
diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py
index 9f5c040f3..1b4e0e14e 100644
--- a/src/analytics/frontend/tests/test_frontend.py
+++ b/src/analytics/frontend/tests/test_frontend.py
@@ -26,7 +26,6 @@ from common.Settings          import ( get_service_port_grpc, get_env_var_name,
 
 from common.tools.kafka.Variables                        import KafkaTopic
 from common.proto.kpi_value_api_pb2                      import KpiValue
-from common.proto.analytics_frontend_pb2                 import AnalyzerAlarms
 from analytics.frontend.client.AnalyticsFrontendClient   import AnalyticsFrontendClient
 from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService
 from analytics.frontend.tests.messages                   import ( create_analyzer_id, create_analyzer,
@@ -34,7 +33,7 @@ from analytics.frontend.tests.messages                   import ( create_analyze
 from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl
 from apscheduler.schedulers.background                   import BackgroundScheduler
 from apscheduler.triggers.interval                       import IntervalTrigger
-
+from common.proto.analytics_frontend_pb2                 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList
 
 ###########################
 # Tests Setup
@@ -85,46 +84,23 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic
 ###########################
 
 # --- "test_validate_kafka_topics" should be executed before the functionality tests ---
-def test_validate_kafka_topics():
-    LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
-    response = KafkaTopic.create_all_topics()
-    assert isinstance(response, bool)
-
-# # ----- core funtionality test -----
-def test_StartAnalytics(analyticsFrontend_client):
-    LOGGER.info(' >>> test_StartAnalytic START: <<< ')
-    stream = analyticsFrontend_client.StartAnalyzer(create_analyzer())
-    for response in stream:
-        LOGGER.debug(str(response))
-    assert isinstance(response, KpiValue)
-
-# To test start and stop listener together
 def test_StartStopAnalyzers(analyticsFrontend_client):
-    LOGGER.info(' >>> test_StartStopAnalyzers START: <<< ')
-    LOGGER.info('--> StartAnalyzer')
+    LOGGER.info(' >>> test_StartAnalyzers START: <<< ')
     added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer())
     LOGGER.debug(str(added_analyzer_id))
-    LOGGER.info(' --> Calling StartResponseListener... ')
-    class_obj = AnalyticsFrontendServiceServicerImpl()
-    response =  class_obj.StartResponseListener(added_analyzer_id.analyzer_id._uuid)
-    LOGGER.debug(response)
-    LOGGER.info("waiting for timer to comlete ...")
-    time.sleep(3)
-    LOGGER.info('--> StopAnalyzer')
-    response = analyticsFrontend_client.StopAnalyzer(added_analyzer_id)
+    assert isinstance(added_analyzer_id, AnalyzerId)
+
+def test_StopAnalytic(analyticsFrontend_client):
+    LOGGER.info(' >>> test_StopAnalytic START: <<< ')
+    response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id())
     LOGGER.debug(str(response))
+    assert isinstance(response, Empty)
 
-# def test_SelectAnalytics(analyticsFrontend_client):
-#     LOGGER.info(' >>> test_SelectAnalytics START: <<< ')
-#     response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter())
-#     LOGGER.debug(str(response))
-#     assert isinstance(response, AnalyzerList)
-
-# def test_StopAnalytic(analyticsFrontend_client):
-#     LOGGER.info(' >>> test_StopAnalytic START: <<< ')
-#     response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id())
-#     LOGGER.debug(str(response))
-#     assert isinstance(response, Empty)
+def test_SelectAnalytics(analyticsFrontend_client):
+    LOGGER.info(' >>> test_SelectAnalytics START: <<< ')
+    response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter())
+    LOGGER.debug(str(response))
+    assert isinstance(response, AnalyzerList)
 
 # def test_ResponseListener():
 #         LOGGER.info(' >>> test_ResponseListener START <<< ')
diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py
index b5cb3bbe0..9e432d637 100644
--- a/src/common/tools/kafka/Variables.py
+++ b/src/common/tools/kafka/Variables.py
@@ -25,11 +25,12 @@ class KafkaConfig(Enum):
 
     @staticmethod
     def get_kafka_address() -> str:
-        # kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None)
-        # if kafka_server_address is None:
-        KFK_NAMESPACE        = get_setting('KFK_NAMESPACE')
-        KFK_PORT             = get_setting('KFK_SERVER_PORT')
-        kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT)
+        kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None)
+        if kafka_server_address is None:
+            KFK_NAMESPACE        = get_setting('KFK_NAMESPACE')
+            KFK_PORT             = get_setting('KFK_SERVER_PORT')
+            kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT)
+        # kafka_server_address = "127.0.0.1:9092"
         return kafka_server_address
         
     @staticmethod
@@ -90,4 +91,4 @@ class KafkaTopic(Enum):
                 return False
         return True
 
-# create all topics after the deployments (Telemetry and Analytics)
+# TODO: create all topics after the deployments (Telemetry and Analytics)
diff --git a/src/kpi_manager/tests/test_messages.py b/src/kpi_manager/tests/test_messages.py
index 7b5c45859..08a2dbf73 100644
--- a/src/kpi_manager/tests/test_messages.py
+++ b/src/kpi_manager/tests/test_messages.py
@@ -27,6 +27,8 @@ def create_kpi_id_request():
 def create_kpi_descriptor_request(descriptor_name: str = "Test_name"):
     _create_kpi_request                                    = kpi_manager_pb2.KpiDescriptor()
     _create_kpi_request.kpi_id.kpi_id.uuid                 = str(uuid.uuid4())
+    # _create_kpi_request.kpi_id.kpi_id.uuid                 = "6e22f180-ba28-4641-b190-2287bf448888"
+    # _create_kpi_request.kpi_id.kpi_id.uuid                 = "1e22f180-ba28-4641-b190-2287bf446666"
     _create_kpi_request.kpi_description                    = descriptor_name
     _create_kpi_request.kpi_sample_type                    = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
     _create_kpi_request.device_id.device_uuid.uuid         = 'DEV2' 
diff --git a/src/kpi_value_api/requirements.in b/src/kpi_value_api/requirements.in
index e95d6d8bb..0615fa833 100644
--- a/src/kpi_value_api/requirements.in
+++ b/src/kpi_value_api/requirements.in
@@ -15,4 +15,4 @@
 confluent-kafka==2.3.*
 requests==2.27.*
 prometheus-api-client==0.5.3
-apscheduler==3.10.4
+apscheduler==3.10.1
diff --git a/src/kpi_value_api/tests/test_kpi_value_api.py b/src/kpi_value_api/tests/test_kpi_value_api.py
index ea6b22585..c7eb8fef1 100644
--- a/src/kpi_value_api/tests/test_kpi_value_api.py
+++ b/src/kpi_value_api/tests/test_kpi_value_api.py
@@ -78,6 +78,13 @@ def test_validate_kafka_topics():
     response = KafkaTopic.create_all_topics()
     assert isinstance(response, bool)
 
+# def test_GetKpiAlarms(kpi_value_api_client):
+#     LOGGER.debug(" >>> test_GetKpiAlarms")
+#     stream = kpi_value_api_client.GetKpiAlarms(create_kpi_id_request())
+#     for response in stream:
+#         LOGGER.debug(str(response))
+#     assert isinstance(response, KpiAlarms)
+
 def test_store_kpi_values(kpi_value_api_client):
     LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
     response = kpi_value_api_client.StoreKpiValues(create_kpi_value_list())
diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py
index 078fa5896..f3cf18d65 100755
--- a/src/telemetry/backend/service/TelemetryBackendService.py
+++ b/src/telemetry/backend/service/TelemetryBackendService.py
@@ -17,7 +17,8 @@ import time
 import random
 import logging
 import threading
-from typing import Any, Dict
+from typing   import Any, Dict
+from datetime import datetime, timezone
 # from common.proto.context_pb2 import Empty
 from confluent_kafka import Producer as KafkaProducer
 from confluent_kafka import Consumer as KafkaConsumer
@@ -53,6 +54,8 @@ class TelemetryBackendService(GenericGrpcService):
         """
         listener for requests on Kafka topic.
         """
+        LOGGER.info('Telemetry backend request listener is running ...')
+        print      ('Telemetry backend request listener is running ...')
         consumer = self.kafka_consumer
         consumer.subscribe([KafkaTopic.REQUEST.value])
         while True:
@@ -65,16 +68,19 @@ class TelemetryBackendService(GenericGrpcService):
                 else:
                     print("Consumer error: {}".format(receive_msg.error()))
                     break
-            
-            collector = json.loads(receive_msg.value().decode('utf-8'))
-            collector_id = receive_msg.key().decode('utf-8')
-            LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector))
-            print('Recevied Collector: {:} - {:}'.format(collector_id, collector))
+            try: 
+                collector = json.loads(receive_msg.value().decode('utf-8'))
+                collector_id = receive_msg.key().decode('utf-8')
+                LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector))
+                print('Recevied Collector: {:} - {:}'.format(collector_id, collector))
 
-            if collector['duration'] == -1 and collector['interval'] == -1:
-                self.TerminateCollectorBackend(collector_id)
-            else:
-                self.RunInitiateCollectorBackend(collector_id, collector)
+                if collector['duration'] == -1 and collector['interval'] == -1:
+                    self.TerminateCollectorBackend(collector_id)
+                else:
+                    self.RunInitiateCollectorBackend(collector_id, collector)
+            except Exception as e:
+                LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e))
+                print         ("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e))
 
     def TerminateCollectorBackend(self, collector_id):
         if collector_id in self.running_threads:
@@ -101,7 +107,7 @@ class TelemetryBackendService(GenericGrpcService):
         print("Initiating backend for collector: ", collector_id)
         start_time = time.time()
         while not stop_event.is_set():
-            if time.time() - start_time >= collector['duration']:            # condition to terminate backend
+            if int(collector['duration']) != -1 and time.time() - start_time >= collector['duration']:            # condition to terminate backend
                 print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time)
                 self.GenerateCollectorTerminationSignal(collector_id, "-1", -1)       # Termination confirmation to frontend.
                 break
@@ -140,7 +146,7 @@ class TelemetryBackendService(GenericGrpcService):
         """
         producer = self.kafka_producer
         kpi_value : Dict = {
-            "time_stamp": str(time.time()),
+            "time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
             "kpi_id"    : kpi_id,
             "kpi_value" : measured_kpi_value
         }
diff --git a/src/telemetry/backend/tests/messagesBackend.py b/src/telemetry/backend/tests/messages.py
similarity index 100%
rename from src/telemetry/backend/tests/messagesBackend.py
rename to src/telemetry/backend/tests/messages.py
diff --git a/src/telemetry/backend/tests/test_TelemetryBackend.py b/src/telemetry/backend/tests/test_backend.py
similarity index 91%
rename from src/telemetry/backend/tests/test_TelemetryBackend.py
rename to src/telemetry/backend/tests/test_backend.py
index 665fa825e..4764d7f5f 100644
--- a/src/telemetry/backend/tests/test_TelemetryBackend.py
+++ b/src/telemetry/backend/tests/test_backend.py
@@ -34,5 +34,4 @@ LOGGER = logging.getLogger(__name__)
 def test_RunRequestListener():
     LOGGER.info('test_RunRequestListener')
     TelemetryBackendServiceObj = TelemetryBackendService()
-    response = threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start()
-    LOGGER.debug(str(response))
+    threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start()
\ No newline at end of file
diff --git a/src/telemetry/frontend/tests/Messages.py b/src/telemetry/frontend/tests/Messages.py
index a0e93e8a1..e6d8ef439 100644
--- a/src/telemetry/frontend/tests/Messages.py
+++ b/src/telemetry/frontend/tests/Messages.py
@@ -22,15 +22,18 @@ from common.proto.kpi_manager_pb2 import KpiId
 def create_collector_id():
     _collector_id                   = telemetry_frontend_pb2.CollectorId()
     # _collector_id.collector_id.uuid = str(uuid.uuid4())
-    _collector_id.collector_id.uuid = "5d45f53f-d567-429f-9427-9196ac72ff0c"
+    _collector_id.collector_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd"
     return _collector_id
 
 def create_collector_request():
     _create_collector_request                                = telemetry_frontend_pb2.Collector()
-    _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4())
-    _create_collector_request.kpi_id.kpi_id.uuid             = str(uuid.uuid4())
-    _create_collector_request.duration_s                     = float(random.randint(8, 16))
-    _create_collector_request.interval_s                     = float(random.randint(2, 4))
+    # _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) 
+    _create_collector_request.collector_id.collector_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd"
+    # _create_collector_request.kpi_id.kpi_id.uuid             = str(uuid.uuid4())
+    _create_collector_request.kpi_id.kpi_id.uuid             = "6e22f180-ba28-4641-b190-2287bf448888"
+    # _create_collector_request.duration_s                     = float(random.randint(8, 16))
+    _create_collector_request.duration_s                     = -1
+    _create_collector_request.interval_s                     = float(random.randint(3, 5))
     return _create_collector_request
 
 def create_collector_filter():
diff --git a/src/telemetry/frontend/tests/test_frontend.py b/src/telemetry/frontend/tests/test_frontend.py
index 9c3f9d3a8..c3f8091c8 100644
--- a/src/telemetry/frontend/tests/test_frontend.py
+++ b/src/telemetry/frontend/tests/test_frontend.py
@@ -105,47 +105,10 @@ def test_SelectCollectors(telemetryFrontend_client):
     LOGGER.debug(str(response))
     assert isinstance(response, CollectorList)
 
-# ----- Non-gRPC method tests ----- 
-def test_RunResponseListener():
-    LOGGER.info(' >>> test_RunResponseListener START: <<< ')
-    TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl()
-    response = TelemetryFrontendServiceObj.RunResponseListener()     # becasue Method "run_kafka_listener" is not define in frontend.proto
-    LOGGER.debug(str(response))
-    assert isinstance(response, bool)
-
-# ------- previous test ----------------
-
-# def test_verify_db_and_table():
-#     LOGGER.info(' >>> test_verify_database_and_tables START: <<< ')
-#     _engine = TelemetryEngine.get_engine()
-#     managementDB.create_database(_engine)
-#     managementDB.create_tables(_engine)
-
-# def test_StartCollector(telemetryFrontend_client):
-#     LOGGER.info(' >>> test_StartCollector START: <<< ')
-#     response = telemetryFrontend_client.StartCollector(create_collector_request())
-#     LOGGER.debug(str(response))
-#     assert isinstance(response, CollectorId)
-
-# def test_run_kafka_listener():
-#     LOGGER.info(' >>> test_run_kafka_listener START: <<< ')
-#     name_mapping = NameMapping()
-#     TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl(name_mapping)
-#     response = TelemetryFrontendServiceObj.run_kafka_listener()     # Method "run_kafka_listener" is not define in frontend.proto
+# # ----- Non-gRPC method tests ----- 
+# def test_RunResponseListener():
+#     LOGGER.info(' >>> test_RunResponseListener START: <<< ')
+#     TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl()
+#     response = TelemetryFrontendServiceObj.RunResponseListener()     # becasue Method "run_kafka_listener" is not define in frontend.proto
 #     LOGGER.debug(str(response))
 #     assert isinstance(response, bool)
-
-# def test_StopCollector(telemetryFrontend_client):
-#     LOGGER.info(' >>> test_StopCollector START: <<< ')
-#     _collector_id = telemetryFrontend_client.StartCollector(create_collector_request())
-#     time.sleep(3)   # wait for small amount before call the stopCollecter()
-#     response = telemetryFrontend_client.StopCollector(_collector_id)
-#     LOGGER.debug(str(response))
-#     assert isinstance(response, Empty)
-
-# def test_select_collectors(telemetryFrontend_client):
-#     LOGGER.info(' >>> test_select_collector requesting <<< ')
-#     response = telemetryFrontend_client.SelectCollectors(create_collector_filter())
-#     LOGGER.info('Received Rows after applying Filter: {:} '.format(response))
-#     LOGGER.debug(str(response))
-#     assert isinstance(response, CollectorList)
\ No newline at end of file
diff --git a/src/telemetry/requirements.in b/src/telemetry/requirements.in
index a0e78d2bf..4e9981afb 100644
--- a/src/telemetry/requirements.in
+++ b/src/telemetry/requirements.in
@@ -12,13 +12,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-anytree==2.8.0
 APScheduler==3.10.1
-influx-line-protocol==0.1.4
 psycopg2-binary==2.9.3
 python-dateutil==2.8.2
 python-json-logger==2.0.2
 pytz==2024.1
-questdb==1.0.1
-requests==2.27.1
-xmltodict==0.12.0
\ No newline at end of file
+requests==2.27.1
\ No newline at end of file
-- 
GitLab