diff --git a/proto/analytics_frontend.proto b/proto/analytics_frontend.proto
index ace0581db816bee1d0d20746f2b864dce602567b..bb6afb2e424b646fd8d9a6810c38a8a6863c230f 100644
--- a/proto/analytics_frontend.proto
+++ b/proto/analytics_frontend.proto
@@ -20,7 +20,7 @@ import "kpi_manager.proto";
 //import "kpi_sample_types.proto";
 
 service AnalyticsFrontendService {
-  rpc StartAnalyzer  (Analyzer      ) returns (AnalyzerId   ) {}
+  rpc StartAnalyzer  (Analyzer      ) returns (AnalyzerId)    {}
   rpc StopAnalyzer   (AnalyzerId    ) returns (context.Empty) {}
   rpc SelectAnalyzers(AnalyzerFilter) returns (AnalyzerList ) {}
 }
diff --git a/proto/kpi_value_api.proto b/proto/kpi_value_api.proto
index dff96272e3d05756dd19a49ecaede7311b196540..4d3a1f216406841344d40712ea04ec82cedf04d0 100644
--- a/proto/kpi_value_api.proto
+++ b/proto/kpi_value_api.proto
@@ -19,8 +19,9 @@ import "context.proto";
 import "kpi_manager.proto";
 
 service KpiValueAPIService {
-	rpc StoreKpiValues  (KpiValueList)   returns (context.Empty) {}
-	rpc SelectKpiValues (KpiValueFilter) returns (KpiValueList)  {}
+	rpc StoreKpiValues  (KpiValueList)      returns (context.Empty)    {}
+	rpc SelectKpiValues (KpiValueFilter)    returns (KpiValueList)     {}
+  rpc GetKpiAlarms    (kpi_manager.KpiId) returns (stream KpiAlarms) {}
 }
 
 message KpiValue {
@@ -50,3 +51,10 @@ message KpiValueFilter {
 	repeated context.Timestamp start_timestamp = 2;
 	repeated context.Timestamp end_timestamp   = 3;
 }
+
+message KpiAlarms {
+  context.Timestamp start_timestamp = 1;
+  context.Timestamp end_timestamp   = 2;
+  kpi_manager.KpiId kpi_id          = 3;
+  map<string, bool> alarms          = 4;
+}
diff --git a/src/analytics/README.md b/src/analytics/README.md
index 9663e5321ace6866491b90553553d9ccbf5793a1..ece11ea969caf1cf92fb474ae1371c9979231ac2 100644
--- a/src/analytics/README.md
+++ b/src/analytics/README.md
@@ -1,4 +1,118 @@
-# How to locally run and test Analytic service (To be added soon)
+# How to Locally Run and Test Analytic Frontend Service
 
 ### Pre-requisets 
-The following requirements should be fulfilled before the execuation of Telemetry service.
+The following requirements should be fulfilled before the execuation of Analytics service.
+
+1. A virtual enviornment exist with all the required packages listed in [requirements.in](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/analytics/frontend/requirements.in) sucessfully  installed.
+2. Verify the creation of required database and table. The 
+[Analytics DB test](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/analytics/tests/test_analytics_db.py) python file lists the functions to create tables and the database.
+3. The Analytics backend service should be running.
+
+4. All required Kafka topics must exist. Call `create_all_topics` from the [Kafka class](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/common/tools/kafka/Variables.py) to create any topics that do not already exist.
+```
+from common.tools.kafka.Variables import KafkaTopic
+KafkaTopic.create_all_topics()
+```
+
+5. There will be an input stream on the Kafka topic that the Spark Streamer will consume and apply a defined thresholds.
+- A JSON encoded string should be generated in the following format:
+```
+'{"time_stamp": "2024-09-03T12:36:26Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 44.22}'
+```
+- `kpi_value` should be float or int.
+- The Kafka producer key should be the UUID of the Analyzer used when creating it.
+- Use the following Kafka topic to generate the stream: `KafkaTopic.ANALYTICS_RESPONSE.value`.
+
+## Steps to create and start Analyzer
+The analyzer can be declared as below but there are many other ways to declare:
+
+The given object creation process for `_create_analyzer` involves defining an instance of the `Analyzer` message from the [gRPC definition](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/194-unable-to-correctly-extract-the-aggregation-function-names-from-the-dictionary-received-as/proto/analytics_frontend.proto) and populating its fields.
+
+```
+from common.proto.analytics_frontend_pb2 import AnalyzerId
+_create_analyzer_id = AnalyzerId()
+```
+
+Here is a breakdown of how each field is populated:
+
+### 1. **Analyzer ID**
+   - `analyzer_id`: This field uses a unique ID to identify the analyzer. In this case, the ID is a UUID.
+     ```python
+     _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
+     ```
+   - The commented-out code shows how the UUID can be generated dynamically using Python's `uuid.uuid4()`. However, for now, a static UUID is used.
+
+### 2. **Algorithm Name**
+   - `algorithm_name`: Specifies the name of the algorithm to be executed by the analyzer.
+     ```python
+     _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold"
+     ```
+
+### 3. **Operation Mode**
+   - `operation_mode`: Sets the mode in which the analyzer operates, in this case, it's set to `ANALYZEROPERATIONMODE_STREAMING`.
+     ```python
+     _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
+     ```
+
+### 4. **Input KPI IDs**
+   - `input_kpi_ids`: This is a list of KPI IDs that will be used as input for the analysis. KPI IDs are represented using `KpiId`, and UUIDs are assigned to each input. The Spark streamer assume that the provided KPIs exists in the KPI Descriptor database.
+     ```python
+     _kpi_id = KpiId()
+     _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
+     _create_analyzer.input_kpi_ids.append(_kpi_id)
+     
+     _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
+     _create_analyzer.input_kpi_ids.append(_kpi_id)
+     ```
+
+### 5. **Output KPI IDs**
+   - `output_kpi_ids`: A list of KPI IDs that are produced as output after analysis. Each one is generated and appended to the list.
+     ```python
+     _kpi_id = KpiId()
+     _create_analyzer.output_kpi_ids.append(_kpi_id)
+     ```
+
+### 6. **Parameters**
+   - `parameters`: This is a dictionary containing key-value pairs of various parameters used by the analyzer. These values are often algorithm-specific.
+     - **Thresholds**: A dictionary containing threshold possible values (min, max, avg, first, last, stdev)_<any_name>. For example: "min_latency", "max_bandwidth", "avg_datarate" etc. 
+       ```python
+       _threshold_dict = {
+           'min_latency' : (00, 10), 
+           'max_bandwidth': (40, 50), 
+           'avg_datarate': (00, 10)
+       }
+       _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
+       ```
+     - **Window Size**: Specifies the size of the time window (e.g., `60 seconds`).
+       ```python
+       _create_analyzer.parameters['window_size'] = "60 seconds"
+       ```
+     - **Window Slider**: Defines the sliding window interval (e.g., `30 seconds`).
+       ```python
+       _create_analyzer.parameters['window_slider'] = "30 seconds"
+       ```
+
+###  **Calling `StartAnalyzer` with an Analyzer Frontend Object**
+   - The following code demonstrates how to call `StartAnalyzer()` with an Analyzer object:
+```python
+from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
+
+analytics_client_object = AnalyticsFrontendClient()
+analytics_client_object.StartAnalyzer(_create_analyzer_id)
+```
+
+### **How to Receive Analyzer Responses**
+   - There is a non-gRPC method in the analyzer frontend called `StartResponseListener(<analyzer_uuid>)`. The `analyzer_uuid` is the UUID of the analyzer provided when calling `StartAnalyzer()`. The following code will log the responses:
+```python
+from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl
+
+analytic_frontend_service_object = AnalyticsFrontendServiceServicerImpl()
+for response in analytic_frontend_service_object.StartResponseListener(<analyzer_uuid>):
+    LOGGER.debug(response)
+```
+
+### **Understanding the Output of the Analyzer**
+- **Output Column Names**: The output JSON string will include two keys for each defined threshold. For example, the `min_latency` threshold will generate two keys: `min_latency_THRESHOLD_FAIL` and `min_latency_THRESHOLD_RAISE`.
+  - `min_latency_THRESHOLD_FAIL` is triggered if the average latency calculated within the defined window size is less than the specified threshold range.
+  - `min_latency_THRESHOLD_RAISE` is triggered if the average latency calculated within the defined window size exceeds the specified threshold range.
+- The thresholds `min_latency_THRESHOLD_FAIL` and `min_latency_THRESHOLD_RAISE` will have a value of `TRUE` if activated; otherwise, they will be set to `FALSE`.
diff --git a/src/analytics/backend/Dockerfile b/src/analytics/backend/Dockerfile
index 17adcd3ab1df5704cc7ef0c5a19b3cfb1539ee22..df5cd7fbde6dc45780fdb3a333594ab11c1ab146 100644
--- a/src/analytics/backend/Dockerfile
+++ b/src/analytics/backend/Dockerfile
@@ -53,6 +53,15 @@ RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
 RUN rm *.proto
 RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
 
+# Install Java (required for PySpark)
+RUN apt-get update && \
+    apt-get install -y default-jdk && \
+    apt-get clean
+
+# Set JAVA_HOME environment variable
+ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
+ENV PATH=$JAVA_HOME/bin:$PATH
+
 # Create component sub-folders, get specific Python packages
 RUN mkdir -p /var/teraflow/analytics/backend
 WORKDIR /var/teraflow/analytics/backend
diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py
index 595603567fe537d9f7b33224cba0fe016a439631..658d237956b4ed3addbbc295ef0d19dd4b977257 100755
--- a/src/analytics/backend/service/AnalyticsBackendService.py
+++ b/src/analytics/backend/service/AnalyticsBackendService.py
@@ -42,7 +42,7 @@ class AnalyticsBackendService(GenericGrpcService):
 
     def StartSparkStreamer(self, analyzer_uuid, analyzer):
         kpi_list      = analyzer['input_kpis'] 
-        oper_list     = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())]  # TODO: update this line...
+        oper_list     = [s.split('_', 1)[0] for s in list(analyzer["thresholds"].keys())]  # TODO: update this line...
         thresholds    = analyzer['thresholds']
         window_size   = analyzer['window_size']
         window_slider = analyzer['window_slider']
@@ -78,20 +78,17 @@ class AnalyticsBackendService(GenericGrpcService):
             LOGGER.error("Failed to terminate analytics backend {:}".format(e))
             return False
 
-    def install_services(self):
-        stop_event = threading.Event()
-        thread = threading.Thread(target=self.RequestListener,
-                                  args=(stop_event,) )
-        thread.start()
-        return (thread, stop_event)
+    def install_servicers(self):
+        threading.Thread(target=self.RequestListener, args=()).start()
 
-    def RequestListener(self, stop_event):
+    def RequestListener(self):
         """
         listener for requests on Kafka topic.
         """
+        LOGGER.info("Request Listener is initiated ...")
         consumer = self.kafka_consumer
         consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
-        while not stop_event.is_set():
+        while True:
             receive_msg = consumer.poll(2.0)
             if receive_msg is None:
                 continue
@@ -101,7 +98,7 @@ class AnalyticsBackendService(GenericGrpcService):
                 else:
                     print("Consumer error: {}".format(receive_msg.error()))
                     break
-            analyzer    = json.loads(receive_msg.value().decode('utf-8'))
+            analyzer      = json.loads(receive_msg.value().decode('utf-8'))
             analyzer_uuid = receive_msg.key().decode('utf-8')
             LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
             print       ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py
index 96e1aa05d898ffdd23c533b74ee87fbf03f54576..f204c6247436177cd032c777c048ecb165051ec2 100644
--- a/src/analytics/backend/service/SparkStreaming.py
+++ b/src/analytics/backend/service/SparkStreaming.py
@@ -33,7 +33,7 @@ def SettingKafkaConsumerParams():   # TODO:  create get_kafka_consumer() in comm
     return {
             # "kafka.bootstrap.servers": '127.0.0.1:9092',
             "kafka.bootstrap.servers": KafkaConfig.get_kafka_address(),
-            "subscribe"              : KafkaTopic.VALUE.value,
+            "subscribe"              : KafkaTopic.VALUE.value,         # topic should have atleast one message before spark session 
             "startingOffsets"        : 'latest',
             "failOnDataLoss"         : 'false'              # Optional: Set to "true" to fail the query on data loss
         }
@@ -64,7 +64,7 @@ def ApplyThresholds(aggregated_df, thresholds):
     for col_name, (fail_th, raise_th) in thresholds.items():
         # Apply TH-Fail condition (if column value is less than the fail threshold)
         aggregated_df = aggregated_df.withColumn(
-            f"{col_name}_THRESHOLD_FAIL", 
+            f"{col_name}_THRESHOLD_FALL", 
             when(col(col_name) < fail_th, True).otherwise(False)
         )
         # Apply TH-RAISE condition (if column value is greater than the raise threshold)
@@ -128,11 +128,11 @@ def SparkStreamer(key, kpi_list, oper_list, thresholds, stop_event,
 
         # --- This will write output to Kafka: ACTUAL IMPLEMENTATION
         query = thresholded_stream_data \
-            .selectExpr(f"'{key}' AS key", "to_json(struct(*)) AS value") \
+            .selectExpr(f"CAST(kpi_id AS STRING) AS key", "to_json(struct(*)) AS value") \
             .writeStream \
             .format("kafka") \
             .option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \
-            .option("topic",                   KafkaTopic.ANALYTICS_RESPONSE.value) \
+            .option("topic",                   KafkaTopic.ALARMS.value) \
             .option("checkpointLocation",      "analytics/.spark/checkpoint") \
             .outputMode("update")
 
diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py
index 9acd6ad9dffe4a5b10b107a6923ed85170ee141f..c3b78967efe13eef9a60e19e50e56bdfca4a410d 100644
--- a/src/analytics/backend/tests/messages.py
+++ b/src/analytics/backend/tests/messages.py
@@ -12,6 +12,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import uuid
+import json
+from common.proto.kpi_manager_pb2        import KpiId
+from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode,
+                                                Analyzer )
 
 def get_kpi_id_list():
     return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"]
@@ -32,3 +37,37 @@ def get_threshold_dict():
     return {
         op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict
     }
+
+
+def create_analyzer():
+    _create_analyzer                              = Analyzer()
+    # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
+    _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
+    _create_analyzer.algorithm_name               = "Test_Aggergate_and_Threshold"
+    _create_analyzer.operation_mode               = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
+    
+    _kpi_id = KpiId()
+    # input IDs to analyze
+    _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
+    _kpi_id.kpi_id.uuid              = "6e22f180-ba28-4641-b190-2287bf448888"
+    _create_analyzer.input_kpi_ids.append(_kpi_id)
+    _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
+    _kpi_id.kpi_id.uuid              = "1e22f180-ba28-4641-b190-2287bf446666"
+    _create_analyzer.input_kpi_ids.append(_kpi_id)
+    _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
+    _create_analyzer.input_kpi_ids.append(_kpi_id)
+    # output IDs after analysis
+    _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
+    _create_analyzer.output_kpi_ids.append(_kpi_id)
+    _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
+    _create_analyzer.output_kpi_ids.append(_kpi_id)
+    # parameter
+    _threshold_dict = {
+        # 'avg_value'   :(20, 30), 'min_value'   :(00, 10), 'max_value'   :(45, 50),
+        'first_value' :(00, 10), 'last_value'  :(40, 50), 'stdev_value':(00, 10)}
+    _create_analyzer.parameters['thresholds']      = json.dumps(_threshold_dict)
+    _create_analyzer.parameters['window_size']     = "60 seconds"     # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" 
+    _create_analyzer.parameters['window_slider']   = "30 seconds"     # should be less than window size
+    _create_analyzer.parameters['store_aggregate'] = str(False)       # TRUE to store. No implemented yet
+
+    return _create_analyzer
\ No newline at end of file
diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py
index 2f40faba94ef7081db609116e8fd869e3d119a24..bc0f47eb213f705a71f46de6a45d38b1c6f37e96 100644
--- a/src/analytics/backend/tests/test_backend.py
+++ b/src/analytics/backend/tests/test_backend.py
@@ -12,12 +12,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import time
+import time, json
+from typing import Dict
 import logging
-import threading
+from threading import Event, Thread
 from common.tools.kafka.Variables import KafkaTopic
 from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
 from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict
+from .messages import create_analyzer
 
 LOGGER = logging.getLogger(__name__)
 
@@ -32,6 +34,24 @@ def test_validate_kafka_topics():
     response = KafkaTopic.create_all_topics()
     assert isinstance(response, bool)
 
+def test_StartSparkStreamer():
+    LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ")
+    analyzer_obj = create_analyzer()
+    analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
+    analyzer_to_generate : Dict = {
+        "algo_name"       : analyzer_obj.algorithm_name,
+        "input_kpis"      : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
+        "output_kpis"     : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
+        "oper_mode"       : analyzer_obj.operation_mode,
+        "thresholds"      : json.loads(analyzer_obj.parameters["thresholds"]),
+        "window_size"     : analyzer_obj.parameters["window_size"],
+        "window_slider"   : analyzer_obj.parameters["window_slider"],
+        # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] 
+    }
+    AnalyticsBackendServiceObj = AnalyticsBackendService()
+    response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate)
+    assert isinstance(response, bool)
+
 # def test_StartRequestListener():
 #     LOGGER.info('test_RunRequestListener')
 #     AnalyticsBackendServiceObj = AnalyticsBackendService()
@@ -49,9 +69,31 @@ def test_StopRequestListener():
     time.sleep(10)
     LOGGER.info('Initiating StopRequestListener...')
     AnalyticsBackendServiceObj = AnalyticsBackendService()
-    response = AnalyticsBackendServiceObj.StopRequestListener(response_thread)
-    LOGGER.debug(str(response)) 
-    assert isinstance(response, bool)
+    AnalyticsBackendServiceObj.stop_event = Event()
+    listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=())
+    listener_thread.start()
+
+    time.sleep(2000)
+
+    # AnalyticsBackendServiceObj.stop_event.set()
+    # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds')
+    # listener_thread.join(timeout=10)
+    # assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected."
+    LOGGER.info('Completed test_RunRequestListener')
+
+# To test START and STOP communication together
+# def test_StopRequestListener():
+#     LOGGER.info('test_RunRequestListener')
+#     LOGGER.info('Initiating StartRequestListener...')
+#     AnalyticsBackendServiceObj = AnalyticsBackendService()
+#     response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
+#     # LOGGER.debug(str(response_thread))
+#     time.sleep(10)
+#     LOGGER.info('Initiating StopRequestListener...')
+#     AnalyticsBackendServiceObj = AnalyticsBackendService()
+#     response = AnalyticsBackendServiceObj.StopRequestListener(response_thread)
+#     LOGGER.debug(str(response)) 
+#     assert isinstance(response, bool)
 
 # To independently tests the SparkListener functionality
 # def test_SparkListener():
diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py
index a7fc8d49248ff01a860accac1b64a29d5533069f..9ffacecc30fac40bb4899b8889386bc23a7609ac 100644
--- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py
+++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py
@@ -12,13 +12,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
-import logging, grpc, json, queue
+import logging, grpc, json
 
 from typing          import Dict
-from confluent_kafka import Consumer as KafkaConsumer
 from confluent_kafka import Producer as KafkaProducer
-from confluent_kafka import KafkaError
 
 from common.tools.kafka.Variables             import KafkaConfig, KafkaTopic
 from common.proto.context_pb2                 import Empty
@@ -27,8 +24,7 @@ from common.proto.analytics_frontend_pb2      import Analyzer, AnalyzerId, Analy
 from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer
 from analytics.database.Analyzer_DB           import AnalyzerDB
 from analytics.database.AnalyzerModel         import Analyzer as AnalyzerModel
-from apscheduler.schedulers.background        import BackgroundScheduler
-from apscheduler.triggers.interval            import IntervalTrigger
+
 
 LOGGER           = logging.getLogger(__name__)
 METRICS_POOL     = MetricsPool('AnalyticsFrontend', 'NBIgRPC')
@@ -41,9 +37,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
         self.result_queue   = queue.Queue()
         self.scheduler      = BackgroundScheduler()
         self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
-        self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
-                                            'group.id'           : 'analytics-frontend',
-                                            'auto.offset.reset'  : 'latest'})
 
     @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
     def StartAnalyzer(self, 
@@ -56,7 +49,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
             AnalyzerModel.ConvertAnalyzerToRow(request)
         )
         self.PublishStartRequestOnKafka(request)
-        
         response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid
         return response
 
@@ -174,15 +166,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
         )
         LOGGER.info("Analyzer Stop Request Generated: Analyzer Id: {:}".format(analyzer_uuid))
         self.kafka_producer.flush()
-        self.StopListener()
-
-    def StopListener(self):
-        """
-        Gracefully stop the Kafka listener and the scheduler.
-        """
-        LOGGER.info("Stopping Kafka listener...")
-        self.scheduler.shutdown()
-        LOGGER.info("Kafka listener stopped.")
 
     @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
     def SelectAnalyzers(self, 
@@ -202,7 +185,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
                 LOGGER.info('Unable to process filter response {:}'.format(e))
         except Exception as e:
             LOGGER.error('Unable to apply filter on table {:}. ERROR: {:}'.format(AnalyzerModel.__name__, e))
-       
 
     def delivery_callback(self, err, msg):
         if err:
diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py
index 44e84e4683bcdcec72e572b8e4deea903bf0de65..4583a45ac12acde9da81c4b7a15165db99fb38bb 100644
--- a/src/analytics/frontend/tests/test_frontend.py
+++ b/src/analytics/frontend/tests/test_frontend.py
@@ -25,7 +25,8 @@ from common.Settings          import ( get_service_port_grpc, get_env_var_name,
                                       ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC )
 
 from common.tools.kafka.Variables                        import KafkaTopic
-from common.proto.analytics_frontend_pb2                 import AnalyzerId, AnalyzerList
+from common.proto.kpi_value_api_pb2                      import KpiValue
+from common.proto.analytics_frontend_pb2                 import AnalyzerAlarms
 from analytics.frontend.client.AnalyticsFrontendClient   import AnalyticsFrontendClient
 from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService
 from analytics.frontend.tests.messages                   import ( create_analyzer_id, create_analyzer,
@@ -89,12 +90,13 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic
 #     response = KafkaTopic.create_all_topics()
 #     assert isinstance(response, bool)
 
-# ----- core funtionality test -----
-# def test_StartAnalytics(analyticsFrontend_client):
-#     LOGGER.info(' >>> test_StartAnalytic START: <<< ')
-#     response = analyticsFrontend_client.StartAnalyzer(create_analyzer())
-#     LOGGER.debug(str(response))
-#     assert isinstance(response, AnalyzerId)
+# # ----- core funtionality test -----
+def test_StartAnalytics(analyticsFrontend_client):
+    LOGGER.info(' >>> test_StartAnalytic START: <<< ')
+    stream = analyticsFrontend_client.StartAnalyzer(create_analyzer())
+    for response in stream:
+        LOGGER.debug(str(response))
+    assert isinstance(response, KpiValue)
 
 # To test start and stop listener together
 def test_StartStopAnalyzers(analyticsFrontend_client):
diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py
index 73b633e23cd55aefeed9b8075f2ad35348fc83ef..cadeec6ed331f599411d6480769985673f8d584d 100644
--- a/src/common/tools/kafka/Variables.py
+++ b/src/common/tools/kafka/Variables.py
@@ -46,6 +46,7 @@ class KafkaTopic(Enum):
     RAW                = 'topic_raw' 
     LABELED            = 'topic_labeled'
     VALUE              = 'topic_value'
+    ALARMS             = 'topic_alarms'
     ANALYTICS_REQUEST  = 'topic_request_analytics'
     ANALYTICS_RESPONSE = 'topic_response_analytics'
 
@@ -77,8 +78,8 @@ class KafkaTopic(Enum):
                 # LOGGER.debug("Existing topic list: {:}".format(topic_metadata.topics))
                 if topic not in topic_metadata.topics:
                     # If the topic does not exist, create a new topic
-                    print("Topic {:} does not exist. Creating...".format(topic))
-                    LOGGER.debug("Topic {:} does not exist. Creating...".format(topic))
+                    # print("Topic {:} does not exist. Creating...".format(topic))
+                    # LOGGER.debug("Topic {:} does not exist. Creating...".format(topic))
                     new_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
                     KafkaConfig.get_admin_client().create_topics([new_topic])
                 else:
diff --git a/src/kpi_value_api/client/KpiValueApiClient.py b/src/kpi_value_api/client/KpiValueApiClient.py
index f432271cfb7c8136f72156330b25d0b82b934d99..dfc5f07254a30db34a20ee8d0eae931cfd0ce571 100644
--- a/src/kpi_value_api/client/KpiValueApiClient.py
+++ b/src/kpi_value_api/client/KpiValueApiClient.py
@@ -15,17 +15,18 @@
 import grpc, logging
 
 from common.Constants import ServiceNameEnum
-from common.Settings import get_service_host, get_service_port_grpc
-from common.tools.client.RetryDecorator import retry, delay_exponential
-from common.tools.grpc.Tools import grpc_message_to_json_string
+from common.Settings  import get_service_host, get_service_port_grpc
 
-from common.proto.context_pb2 import Empty
-from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter
+from common.tools.client.RetryDecorator  import retry, delay_exponential
+from common.tools.grpc.Tools             import grpc_message_to_json_string
+from common.proto.context_pb2            import Empty
+from common.proto.kpi_manager_pb2        import KpiId
+from common.proto.kpi_value_api_pb2      import KpiValueList, KpiValueFilter, KpiAlarms
 from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceStub
 
-LOGGER = logging.getLogger(__name__)
-MAX_RETRIES = 10
-DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
+LOGGER          = logging.getLogger(__name__)
+MAX_RETRIES     = 10
+DELAY_FUNCTION  = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
 RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
 
 class KpiValueApiClient:
@@ -34,8 +35,8 @@ class KpiValueApiClient:
         if not port: port = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI)
         self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
         LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint)))
-        self.channel = None
-        self.stub = None
+        self.channel  = None
+        self.stub     = None
         self.connect()
         LOGGER.debug('Channel created')
 
@@ -46,18 +47,25 @@ class KpiValueApiClient:
     def close(self):
         if self.channel is not None: self.channel.close()
         self.channel = None
-        self.stub = None
+        self.stub    = None
     
     @RETRY_DECORATOR
-    def StoreKpiValues(self, request: KpiValueList) -> Empty:
+    def StoreKpiValues(self, request: KpiValueList) -> Empty: # type: ignore
         LOGGER.debug('StoreKpiValues: {:s}'.format(grpc_message_to_json_string(request)))
         response = self.stub.StoreKpiValues(request)
         LOGGER.debug('StoreKpiValues result: {:s}'.format(grpc_message_to_json_string(response)))
         return response
         
     @RETRY_DECORATOR
-    def SelectKpiValues(self, request: KpiValueFilter) -> KpiValueList:
+    def SelectKpiValues(self, request: KpiValueFilter) -> KpiValueList: # type: ignore
         LOGGER.debug('SelectKpiValues: {:s}'.format(grpc_message_to_json_string(request)))
         response = self.stub.SelectKpiValues(request)
         LOGGER.debug('SelectKpiValues result: {:s}'.format(grpc_message_to_json_string(response)))
         return response
+
+    @RETRY_DECORATOR
+    def GetKpiAlarms(self, request: KpiId) -> KpiAlarms: # type: ignore
+        LOGGER.debug('GetKpiAlarms: {:s}'.format(grpc_message_to_json_string(request)))
+        response = self.stub.GetKpiAlarms(request)
+        LOGGER.debug('GetKpiAlarms result: {:s}'.format(grpc_message_to_json_string(response)))
+        return response
diff --git a/src/kpi_value_api/requirements.in b/src/kpi_value_api/requirements.in
index f5695906a8d02d55e15960a76986b8d03f02dba1..e95d6d8bbd81abca7eb2622e1faf6af473fcdb12 100644
--- a/src/kpi_value_api/requirements.in
+++ b/src/kpi_value_api/requirements.in
@@ -14,4 +14,5 @@
 
 confluent-kafka==2.3.*
 requests==2.27.*
-prometheus-api-client==0.5.3
\ No newline at end of file
+prometheus-api-client==0.5.3
+apscheduler==3.10.4
diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py
index 4ea978fafc8d7454d41f64182d553d030215113a..0f57f88219a74108a555cf87e9bdb98999fd5da2 100644
--- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py
+++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py
@@ -12,18 +12,22 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging, grpc, json
+from datetime import datetime
+import logging, grpc, json, queue
 from typing import Dict
 from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
 from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
+from confluent_kafka import KafkaError
 
 from common.proto.context_pb2 import Empty
 from common.proto.kpi_sample_types_pb2 import KpiSampleType
 from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId
 from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceServicer
-from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiValue, KpiValueType
-
+from common.proto.kpi_value_api_pb2 import KpiAlarms, KpiValueList, KpiValueFilter, KpiValue, KpiValueType
+from apscheduler.schedulers.background        import BackgroundScheduler
+from apscheduler.triggers.interval            import IntervalTrigger
 from confluent_kafka import Producer as KafkaProducer
+from confluent_kafka import Consumer as KafkaConsumer
 
 from prometheus_api_client import PrometheusConnect
 from prometheus_api_client.utils import parse_datetime
@@ -37,8 +41,14 @@ PROM_URL     = "http://prometheus-k8s.monitoring.svc.cluster.local:9090"    # TO
 class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
     def __init__(self):
         LOGGER.debug('Init KpiValueApiService')
+        self.listener_topic = KafkaTopic.ALARMS.value
+        self.result_queue   = queue.Queue()
+        self.scheduler      = BackgroundScheduler()
         self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
-
+        self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
+                                            'group.id'           : 'analytics-frontend',
+                                            'auto.offset.reset'  : 'latest'})
+        
     @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
     def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext
                        ) -> Empty:
@@ -109,17 +119,14 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
                     kpi_value = KpiValue()
                     kpi_value.kpi_id.kpi_id  = record['metric']['__name__'],      
                     kpi_value.timestamp      = value[0],      
-                    kpi_value.kpi_value_type = self.ConverValueToKpiValueType(value[1])
+                    kpi_value.kpi_value_type.CopyFrom(self.ConverValueToKpiValueType(value['kpi_value']))
                     response.kpi_value_list.append(kpi_value)
         return response
     
     def GetKpiSampleType(self, kpi_value: str, kpi_manager_client):
-        print("--- START -----")
-
         kpi_id = KpiId()
         kpi_id.kpi_id.uuid = kpi_value.kpi_id.kpi_id.uuid
         # print("KpiId generated: {:}".format(kpi_id))
-
         try:
             kpi_descriptor_object = KpiDescriptor()
             kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id)
@@ -135,26 +142,91 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
             LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e))
             print ("Unable to get KpiDescriptor. Error: {:}".format(e))
 
-    def ConverValueToKpiValueType(self, value):
-        # Check if the value is an integer (int64)
+    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
+    def GetKpiAlarms(self, request: KpiId, grpc_context: grpc.ServicerContext) -> KpiAlarms: # type: ignore
+        """
+        Get Alarms from Kafka return Alrams periodically.
+        """
+        LOGGER.debug('GetKpiAlarms: {:}'.format(request))
+        response = KpiAlarms()
+
+        for alarm_key, value in self.StartResponseListener(request.kpi_id.uuid):
+            response.start_timestamp.timestamp = datetime.strptime(
+                value["window"]["start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
+            response.end_timestamp.timestamp = datetime.strptime(
+                value["window"]["end"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
+            response.kpi_id.kpi_id.uuid  = value['kpi_id']
+            for key, threshold in value.items():
+                if "THRESHOLD_" in key:
+                    response.alarms[key] = threshold
+
+            yield response
+
+    def StartResponseListener(self, filter_key=None):
+        """
+        Start the Kafka response listener with APScheduler and return key-value pairs periodically.
+        """
+        LOGGER.info("Starting StartResponseListener")
+        # Schedule the ResponseListener at fixed intervals
+        self.scheduler.add_job(
+            self.response_listener,
+            trigger=IntervalTrigger(seconds=5),
+            args=[filter_key], 
+            id=f"response_listener_{self.listener_topic}",
+            replace_existing=True
+        )
+        self.scheduler.start()
+        LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...")
         try:
-            int_value = int(value)
-            return KpiValueType(int64Val=int_value)
-        except (ValueError, TypeError):
-            pass
-        # Check if the value is a float
+            while True:
+                LOGGER.info("entering while...")
+                key, value = self.result_queue.get()  # Wait until a result is available
+                LOGGER.info("In while true ...")
+                yield key, value  # Yield the result to the calling function
+        except KeyboardInterrupt:
+            LOGGER.warning("Listener stopped manually.")
+        finally:
+            self.StopListener()
+
+    def response_listener(self, filter_key=None):
+        """
+        Poll Kafka messages and put key-value pairs into the queue.
+        """
+        LOGGER.info(f"Polling Kafka topic {self.listener_topic}...")
+
+        consumer = self.kafka_consumer
+        consumer.subscribe([self.listener_topic])
+        msg = consumer.poll(2.0)
+        if msg is None:
+            return
+        elif msg.error():
+            if msg.error().code() != KafkaError._PARTITION_EOF:
+                LOGGER.error(f"Kafka error: {msg.error()}")
+            return
         try:
-            float_value = float(value)
-            return KpiValueType(floatVal=float_value)
-        except (ValueError, TypeError):
-            pass
-        # Check if the value is a boolean
-        if value.lower() in ['true', 'false']:
-            bool_value = value.lower() == 'true'
-            return KpiValueType(boolVal=bool_value)
-        # If none of the above, treat it as a string
-        return KpiValueType(stringVal=value)
+            key = msg.key().decode('utf-8') if msg.key() else None
+            if filter_key is not None and key == filter_key:
+                value = json.loads(msg.value().decode('utf-8'))
+                LOGGER.info(f"Received key: {key}, value: {value}")
+                self.result_queue.put((key, value))
+            else:
+                LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}")
+        except Exception as e:
+            LOGGER.error(f"Error processing Kafka message: {e}")
 
     def delivery_callback(self, err, msg):
         if err: LOGGER.debug('Message delivery failed: {:}'.format(err))
         else:   LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
+
+    def ConverValueToKpiValueType(self, value):
+        kpi_value_type = KpiValueType()
+        if isinstance(value, int):
+            kpi_value_type.int32Val = value
+        elif isinstance(value, float):
+            kpi_value_type.floatVal = value
+        elif isinstance(value, str):
+            kpi_value_type.stringVal = value
+        elif isinstance(value, bool):
+            kpi_value_type.boolVal = value
+        # Add other checks for different types as needed
+        return kpi_value_type
diff --git a/src/kpi_value_api/tests/messages.py b/src/kpi_value_api/tests/messages.py
index d8ad14bd44eebc1e9412cfd5ff2973e6018c95e9..50240c0154deff33dfdbb797cd5e0fca9a05c8ab 100644
--- a/src/kpi_value_api/tests/messages.py
+++ b/src/kpi_value_api/tests/messages.py
@@ -13,9 +13,16 @@
 # limitations under the License.
 
 import uuid, time
+from common.proto import kpi_manager_pb2
 from common.proto.kpi_value_api_pb2 import KpiValue, KpiValueList
 
 
+def create_kpi_id_request():
+    _create_kpi_id = kpi_manager_pb2.KpiId()
+    _create_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
+    # _create_kpi_id.kpi_id.uuid = str(uuid.uuid4())
+    return _create_kpi_id
+
 def create_kpi_value_list():
     _create_kpi_value_list = KpiValueList()
     # To run this experiment sucessfully, add an existing UUID of a KPI Descriptor from the KPI DB.
diff --git a/src/kpi_value_api/tests/test_kpi_value_api.py b/src/kpi_value_api/tests/test_kpi_value_api.py
index 307b5cdad4e6503a774e308f669fc44762f84bf1..ac17f6f987d437ee6dacd7dfdc7a1de7a8965343 100644
--- a/src/kpi_value_api/tests/test_kpi_value_api.py
+++ b/src/kpi_value_api/tests/test_kpi_value_api.py
@@ -21,8 +21,8 @@ from common.Settings import (
     ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
 from kpi_value_api.service.KpiValueApiService import KpiValueApiService
 from kpi_value_api.client.KpiValueApiClient import KpiValueApiClient
-from kpi_value_api.tests.messages import create_kpi_value_list
-
+from kpi_value_api.tests.messages import create_kpi_value_list, create_kpi_id_request
+from common.proto.kpi_value_api_pb2 import KpiAlarms
 
 LOCAL_HOST = '127.0.0.1'
 KPIVALUEAPI_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI)  # type: ignore
@@ -78,7 +78,14 @@ def test_validate_kafka_topics():
     response = KafkaTopic.create_all_topics()
     assert isinstance(response, bool)
 
-def test_store_kpi_values(kpi_value_api_client):
-    LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
-    response = kpi_value_api_client.StoreKpiValues(create_kpi_value_list())
-    assert isinstance(response, Empty)
+def test_GetKpiAlarms(kpi_value_api_client):
+    LOGGER.debug(" >>> test_GetKpiAlarms")
+    stream = kpi_value_api_client.GetKpiAlarms(create_kpi_id_request())
+    for response in stream:
+        LOGGER.debug(str(response))
+    assert isinstance(response, KpiAlarms)
+
+# def test_store_kpi_values(kpi_value_api_client):
+#     LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
+#     response = kpi_value_api_client.StoreKpiValues(create_kpi_value_list())
+#     assert isinstance(response, Empty)
diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py
index 6ab841238f446a2895cd163fab4b7eb05eaa3176..078fa5896d5fb5033833e0e2ef2248613ef80c18 100755
--- a/src/telemetry/backend/service/TelemetryBackendService.py
+++ b/src/telemetry/backend/service/TelemetryBackendService.py
@@ -83,7 +83,7 @@ class TelemetryBackendService(GenericGrpcService):
             thread.join()
             print ("Terminating backend (by StopCollector): Collector Id: ", collector_id)
             del self.running_threads[collector_id]
-            self.GenerateCollectorResponse(collector_id, "-1", -1)          # Termination confirmation to frontend.
+            self.GenerateCollectorTerminationSignal(collector_id, "-1", -1)          # Termination confirmation to frontend.
         else:
             print ('Backend collector {:} not found'.format(collector_id))
 
@@ -103,11 +103,28 @@ class TelemetryBackendService(GenericGrpcService):
         while not stop_event.is_set():
             if time.time() - start_time >= collector['duration']:            # condition to terminate backend
                 print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time)
-                self.GenerateCollectorResponse(collector_id, "-1", -1)       # Termination confirmation to frontend.
+                self.GenerateCollectorTerminationSignal(collector_id, "-1", -1)       # Termination confirmation to frontend.
                 break
             self.ExtractKpiValue(collector_id, collector['kpi_id'])
             time.sleep(collector['interval'])
 
+    def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
+        """
+        Method to write kpi Termination signat on RESPONSE Kafka topic
+        """
+        producer = self.kafka_producer
+        kpi_value : Dict = {
+            "kpi_id"    : kpi_id,
+            "kpi_value" : measured_kpi_value,
+        }
+        producer.produce(
+            KafkaTopic.RESPONSE.value, # TODO: to  the topic ...
+            key      = collector_id,
+            value    = json.dumps(kpi_value),
+            callback = self.delivery_callback
+        )
+        producer.flush()
+
     def ExtractKpiValue(self, collector_id: str, kpi_id: str):
         """
         Method to extract kpi value.
@@ -123,117 +140,27 @@ class TelemetryBackendService(GenericGrpcService):
         """
         producer = self.kafka_producer
         kpi_value : Dict = {
+            "time_stamp": str(time.time()),
             "kpi_id"    : kpi_id,
             "kpi_value" : measured_kpi_value
         }
         producer.produce(
-            KafkaTopic.RESPONSE.value,
+            KafkaTopic.VALUE.value, # TODO: to  the topic ...
             key      = collector_id,
             value    = json.dumps(kpi_value),
             callback = self.delivery_callback
         )
         producer.flush()
 
-    def GenerateRawMetric(self, metrics: Any):
-        """
-        Method writes raw metrics on VALUE Kafka topic
-        """
-        producer = self.kafka_producer
-        some_metric : Dict = {
-            "some_id"    : metrics
-        }
-        producer.produce(
-            KafkaTopic.VALUE.value,
-            key      = 'raw',
-            value    = json.dumps(some_metric),
-            callback = self.delivery_callback
-        )
-        producer.flush()
-
     def delivery_callback(self, err, msg):
         """
         Callback function to handle message delivery status.
         Args: err (KafkaError): Kafka error object.
               msg (Message): Kafka message object.
         """
-        if err: print(f'Message delivery failed: {err}')
-        # else:   print(f'Message delivered to topic {msg.topic()}')
-
-# # ----------- BELOW: Actual Implementation of Kafka Producer with Node Exporter -----------
-#     @staticmethod
-#     def fetch_single_node_exporter_metric():
-#         """
-#         Method to fetch metrics from Node Exporter.
-#         Returns:
-#             str: Metrics fetched from Node Exporter.
-#         """
-#         KPI = "node_network_receive_packets_total"
-#         try:
-#             response = requests.get(EXPORTER_ENDPOINT) # type: ignore
-#             LOGGER.info("Request status {:}".format(response))
-#             if response.status_code == 200:
-#                 # print(f"Metrics fetched sucessfully...")
-#                 metrics = response.text
-#                 # Check if the desired metric is available in the response
-#                 if KPI in metrics:
-#                     KPI_VALUE = TelemetryBackendService.extract_metric_value(metrics, KPI)
-#                     # Extract the metric value
-#                     if KPI_VALUE is not None:
-#                         LOGGER.info("Extracted value of {:} is {:}".format(KPI, KPI_VALUE))
-#                         print(f"Extracted value of {KPI} is: {KPI_VALUE}")
-#                         return KPI_VALUE
-#             else:
-#                 LOGGER.info("Failed to fetch metrics. Status code: {:}".format(response.status_code))
-#                 # print(f"Failed to fetch metrics. Status code: {response.status_code}")
-#                 return None
-#         except Exception as e:
-#             LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e))
-#             # print(f"Failed to fetch metrics: {str(e)}")
-#             return None
-
-#     @staticmethod
-#     def extract_metric_value(metrics, metric_name):
-#         """
-#         Method to extract the value of a metric from the metrics string.
-#         Args:
-#             metrics (str): Metrics string fetched from Exporter.
-#             metric_name (str): Name of the metric to extract.
-#         Returns:
-#             float: Value of the extracted metric, or None if not found.
-#         """
-#         try:
-#             # Find the metric line containing the desired metric name
-#             metric_line = next(line for line in metrics.split('\n') if line.startswith(metric_name))
-#             # Split the line to extract the metric value
-#             metric_value = float(metric_line.split()[1])
-#             return metric_value
-#         except StopIteration:
-#             print(f"Metric '{metric_name}' not found in the metrics.")
-#             return None
-
-#     @staticmethod
-#     def stream_node_export_metrics_to_raw_topic():
-#         try:
-#             while True:
-#                 response = requests.get(EXPORTER_ENDPOINT)
-#                 # print("Response Status {:} ".format(response))
-#                 # LOGGER.info("Response Status {:} ".format(response))
-#                 try: 
-#                     if response.status_code == 200:
-#                         producerObj = KafkaProducer(PRODUCER_CONFIG)
-#                         producerObj.produce(KAFKA_TOPICS['raw'], key="raw", value= str(response.text), callback=TelemetryBackendService.delivery_callback)
-#                         producerObj.flush()
-#                         LOGGER.info("Produce to topic")
-#                     else:
-#                         LOGGER.info("Didn't received expected response. Status code: {:}".format(response.status_code))
-#                         print(f"Didn't received expected response. Status code: {response.status_code}")
-#                         return None
-#                     time.sleep(15)
-#                 except Exception as e:
-#                     LOGGER.info("Failed to process response. Status code: {:}".format(e))
-#                     return None
-#         except Exception as e:
-#             LOGGER.info("Failed to fetch metrics. Status code: {:}".format(e))
-#             print(f"Failed to fetch metrics: {str(e)}")
-#             return None
-# # ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------
\ No newline at end of file
+        if err: 
+            LOGGER.debug('Message delivery failed: {:}'.format(err))
+            print(f'Message delivery failed: {err}')
+        else:
+            LOGGER.info('Message delivered to topic {:}'.format(msg.topic()))
+            print(f'Message delivered to topic {msg.topic()}')
diff --git a/src/telemetry/backend/tests/test_TelemetryBackend.py b/src/telemetry/backend/tests/test_TelemetryBackend.py
index a2bbee540c3ce348ef52eceb0e776f48a68d94b1..665fa825e3ee31b2e92351d9c5855f627ce40fa1 100644
--- a/src/telemetry/backend/tests/test_TelemetryBackend.py
+++ b/src/telemetry/backend/tests/test_TelemetryBackend.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 import logging
+import threading
 from common.tools.kafka.Variables import KafkaTopic
 from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService
 
@@ -25,14 +26,13 @@ LOGGER = logging.getLogger(__name__)
 ###########################
 
 # --- "test_validate_kafka_topics" should be run before the functionality tests ---
-def test_validate_kafka_topics():
-    LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
-    response = KafkaTopic.create_all_topics()
-    assert isinstance(response, bool)
+# def test_validate_kafka_topics():
+#     LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
+#     response = KafkaTopic.create_all_topics()
+#     assert isinstance(response, bool)
 
 def test_RunRequestListener():
     LOGGER.info('test_RunRequestListener')
     TelemetryBackendServiceObj = TelemetryBackendService()
-    response = TelemetryBackendServiceObj.RunRequestListener()
+    response = threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start()
     LOGGER.debug(str(response))
-    assert isinstance(response, bool)
diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py
index c72e66bdd53f165ebae131e07f51d23e609dd8be..ad99dff12dc641232972f8cff8226878caefd71b 100644
--- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py
+++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py
@@ -161,9 +161,8 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
     # ---------- Independent Method ---------------
     # Listener method is independent of any method (same lifetime as service)
     # continously listens for responses
-    def RunResponseListener(self):
+    def install_servicers(self):
         threading.Thread(target=self.ResponseListener).start()
-        return True
 
     def ResponseListener(self):
         """