diff --git a/my_deploy.sh b/my_deploy.sh
index ee17cb3679f6390115917a005cfaf670585c28c7..e5f2ff904e74412a9a22364c894f3cd691c38f4a 100755
--- a/my_deploy.sh
+++ b/my_deploy.sh
@@ -26,7 +26,7 @@ export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_gene
 #export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring"
 
 # Uncomment to activate Monitoring Framework (new)
-#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api"
+#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api telemetry analytics"
 
 # Uncomment to activate BGP-LS Speaker
 #export TFS_COMPONENTS="${TFS_COMPONENTS} bgpls_speaker"
diff --git a/proto/kpi_value_api.proto b/proto/kpi_value_api.proto
index 4d3a1f216406841344d40712ea04ec82cedf04d0..a97b0ae2b023dfe0a535aa3cb1ba63b00b418371 100644
--- a/proto/kpi_value_api.proto
+++ b/proto/kpi_value_api.proto
@@ -19,19 +19,19 @@ import "context.proto";
 import "kpi_manager.proto";
 
 service KpiValueAPIService {
-	rpc StoreKpiValues  (KpiValueList)      returns (context.Empty)    {}
-	rpc SelectKpiValues (KpiValueFilter)    returns (KpiValueList)     {}
+  rpc StoreKpiValues  (KpiValueList     ) returns (context.Empty   ) {}
+  rpc SelectKpiValues (KpiValueFilter   ) returns (KpiValueList    ) {}
   rpc GetKpiAlarms    (kpi_manager.KpiId) returns (stream KpiAlarms) {}
 }
 
 message KpiValue {
-	kpi_manager.KpiId kpi_id         = 1;
-	context.Timestamp timestamp      = 2;
-	KpiValueType      kpi_value_type = 3;
+  kpi_manager.KpiId kpi_id         = 1;
+  context.Timestamp timestamp      = 2;
+  KpiValueType      kpi_value_type = 3;
 }
 
 message KpiValueList {
-	repeated KpiValue kpi_value_list = 1;
+  repeated KpiValue kpi_value_list = 1;
 }
 
 message KpiValueType {
@@ -47,9 +47,9 @@ message KpiValueType {
 }
 
 message KpiValueFilter {
-	repeated kpi_manager.KpiId kpi_id          = 1;
-	repeated context.Timestamp start_timestamp = 2;
-	repeated context.Timestamp end_timestamp   = 3;
+  repeated kpi_manager.KpiId kpi_id          = 1;
+  repeated context.Timestamp start_timestamp = 2;
+  repeated context.Timestamp end_timestamp   = 3;
 }
 
 message KpiAlarms {
diff --git a/scripts/run_tests_locally-analytics-backend.sh b/scripts/run_tests_locally-analytics-backend.sh
new file mode 100755
index 0000000000000000000000000000000000000000..722779824a8c91b5d7cff0337c10c4ea1327a1b6
--- /dev/null
+++ b/scripts/run_tests_locally-analytics-backend.sh
@@ -0,0 +1,25 @@
+#!/bin/bash
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+PROJECTDIR=`pwd`
+
+cd $PROJECTDIR/src
+RCFILE=$PROJECTDIR/coverage/.coveragerc
+export KFK_SERVER_ADDRESS='127.0.0.1:9092'
+CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
+export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
+python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
+    analytics/backend/tests/test_backend.py
diff --git a/scripts/run_tests_locally-analytics-frontend.sh b/scripts/run_tests_locally-analytics-frontend.sh
index e30d30da623b2d0eee3d925d69a846b4b1f516a3..e74eb4ec198d77688f62004931c69eac31e60f0c 100755
--- a/scripts/run_tests_locally-analytics-frontend.sh
+++ b/scripts/run_tests_locally-analytics-frontend.sh
@@ -18,6 +18,7 @@ PROJECTDIR=`pwd`
 
 cd $PROJECTDIR/src
 RCFILE=$PROJECTDIR/coverage/.coveragerc
+export KFK_SERVER_ADDRESS='127.0.0.1:9092'
 CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
 export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
 python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
diff --git a/scripts/run_tests_locally-kpi-value-API.sh b/scripts/run_tests_locally-kpi-value-API.sh
index 3953d2a89c6fbe2bd3546e648246b9b018e5fdb0..96ac558bad5f0bf6bc6f5ee90a26cd11fda69273 100755
--- a/scripts/run_tests_locally-kpi-value-API.sh
+++ b/scripts/run_tests_locally-kpi-value-API.sh
@@ -19,8 +19,9 @@ PROJECTDIR=`pwd`
 cd $PROJECTDIR/src
 
 RCFILE=$PROJECTDIR/coverage/.coveragerc
-KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
-KFK_SERVER_ADDRESS=${KAFKA_IP}:9092
+# KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
+# export KFK_SERVER_ADDRESS=${KAFKA_IP}:9092
+export KFK_SERVER_ADDRESS='127.0.0.1:9092'
 # helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0
 python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG -o log_cli=true --verbose \
     kpi_value_api/tests/test_kpi_value_api.py
diff --git a/scripts/run_tests_locally-telemetry-backend.sh b/scripts/run_tests_locally-telemetry-backend.sh
index 97a06a0d6c16daf94e3e6b30bfc70eca3e7ce3a3..745d77c62849fb946e37e0d3177c94e162bdc6af 100755
--- a/scripts/run_tests_locally-telemetry-backend.sh
+++ b/scripts/run_tests_locally-telemetry-backend.sh
@@ -19,9 +19,14 @@ PROJECTDIR=`pwd`
 cd $PROJECTDIR/src
 # RCFILE=$PROJECTDIR/coverage/.coveragerc
 # coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
+#     kpi_manager/tests/test_unitary.py
 
+# python3 kpi_manager/tests/test_unitary.py
+export KFK_SERVER_ADDRESS='127.0.0.1:9092'
+CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
+export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
 RCFILE=$PROJECTDIR/coverage/.coveragerc
 
 
 python3 -m pytest --log-level=INFO --log-cli-level=debug --verbose \
-    telemetry/backend/tests/test_TelemetryBackend.py
+    telemetry/backend/tests/test_backend.py
diff --git a/scripts/run_tests_locally-telemetry-frontend.sh b/scripts/run_tests_locally-telemetry-frontend.sh
index 7506be5e0750b44e37368e86dbbfd00131c0d270..a6447cb4c6bfaa6d80fefac8417df28a960b1943 100755
--- a/scripts/run_tests_locally-telemetry-frontend.sh
+++ b/scripts/run_tests_locally-telemetry-frontend.sh
@@ -18,9 +18,10 @@ PROJECTDIR=`pwd`
 
 cd $PROJECTDIR/src
 
-CRDB_SQL_ADDRESS=$(kubectl get service --namespace ${CRDB_NAMESPACE} cockroachdb-public -o 'jsonpath={.spec.clusterIP}')
+# python3 kpi_manager/tests/test_unitary.py
+export KFK_SERVER_ADDRESS='127.0.0.1:9092'
+CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
 export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
-
 RCFILE=$PROJECTDIR/coverage/.coveragerc
 python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
     telemetry/frontend/tests/test_frontend.py
diff --git a/scripts/show_logs_telemetry-backend.sh b/scripts/show_logs_telemetry-backend.sh
new file mode 100755
index 0000000000000000000000000000000000000000..c28083dcbf5c7056145d1a0696116da66b5e9828
--- /dev/null
+++ b/scripts/show_logs_telemetry-backend.sh
@@ -0,0 +1,27 @@
+#!/bin/bash
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+########################################################################################################################
+# Define your deployment settings here
+########################################################################################################################
+
+# If not already set, set the name of the Kubernetes namespace to deploy to.
+export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
+
+########################################################################################################################
+# Automated steps start here
+########################################################################################################################
+
+kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/telemetryservice -c backend
diff --git a/scripts/show_logs_telemetry-frontend.sh b/scripts/show_logs_telemetry-frontend.sh
new file mode 100755
index 0000000000000000000000000000000000000000..821dc275b22ebf7ffc63d2e8c41dfab684407895
--- /dev/null
+++ b/scripts/show_logs_telemetry-frontend.sh
@@ -0,0 +1,27 @@
+#!/bin/bash
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+########################################################################################################################
+# Define your deployment settings here
+########################################################################################################################
+
+# If not already set, set the name of the Kubernetes namespace to deploy to.
+export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
+
+########################################################################################################################
+# Automated steps start here
+########################################################################################################################
+
+kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/telemetryservice -c frontend
diff --git a/src/analytics/backend/Dockerfile b/src/analytics/backend/Dockerfile
index df5cd7fbde6dc45780fdb3a333594ab11c1ab146..17adcd3ab1df5704cc7ef0c5a19b3cfb1539ee22 100644
--- a/src/analytics/backend/Dockerfile
+++ b/src/analytics/backend/Dockerfile
@@ -53,15 +53,6 @@ RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
 RUN rm *.proto
 RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
 
-# Install Java (required for PySpark)
-RUN apt-get update && \
-    apt-get install -y default-jdk && \
-    apt-get clean
-
-# Set JAVA_HOME environment variable
-ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
-ENV PATH=$JAVA_HOME/bin:$PATH
-
 # Create component sub-folders, get specific Python packages
 RUN mkdir -p /var/teraflow/analytics/backend
 WORKDIR /var/teraflow/analytics/backend
diff --git a/src/analytics/backend/requirements.in b/src/analytics/backend/requirements.in
index 9df678fe819f33d479b8f5090ca9ac4eb1f4047c..360d94f4668b19feba305df76a65ef70b26e091f 100644
--- a/src/analytics/backend/requirements.in
+++ b/src/analytics/backend/requirements.in
@@ -12,5 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-pyspark==3.5.2
+dask==2024.1.0
+distributed==2024.1.0
+pandas==2.2.3
 confluent-kafka==2.3.*
diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py
index 658d237956b4ed3addbbc295ef0d19dd4b977257..a8c790e786f2768091ee3ab8d8d61860f18c2f77 100755
--- a/src/analytics/backend/service/AnalyticsBackendService.py
+++ b/src/analytics/backend/service/AnalyticsBackendService.py
@@ -12,18 +12,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
+import time
 import json
 import logging
 import threading
 from common.tools.service.GenericGrpcService import GenericGrpcService
-from analytics.backend.service.SparkStreaming import SparkStreamer
 from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
 from confluent_kafka import Consumer as KafkaConsumer
 from confluent_kafka import KafkaError
 from common.Constants import ServiceNameEnum
 from common.Settings import get_service_port_grpc
-
+from threading import Thread, Event
+from .DaskStreaming import DaskStreamer
 
 LOGGER = logging.getLogger(__name__)
 
@@ -40,44 +40,6 @@ class AnalyticsBackendService(GenericGrpcService):
                                             'group.id'           : 'analytics-frontend',
                                             'auto.offset.reset'  : 'latest'})
 
-    def StartSparkStreamer(self, analyzer_uuid, analyzer):
-        kpi_list      = analyzer['input_kpis'] 
-        oper_list     = [s.split('_', 1)[0] for s in list(analyzer["thresholds"].keys())]  # TODO: update this line...
-        thresholds    = analyzer['thresholds']
-        window_size   = analyzer['window_size']
-        window_slider = analyzer['window_slider']
-        print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
-            kpi_list, oper_list, thresholds, window_size, window_slider))
-        LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
-            kpi_list, oper_list, thresholds, window_size, window_slider))
-        try:
-            stop_event = threading.Event()
-            thread = threading.Thread(target=SparkStreamer, 
-                            args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event,
-                                  window_size, window_slider, None ))
-            self.running_threads[analyzer_uuid] = (thread, stop_event)
-            thread.start()
-            print      ("Initiated Analyzer backend: {:}".format(analyzer_uuid))
-            LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid))
-            return True
-        except Exception as e:
-            print       ("Failed to initiate Analyzer backend: {:}".format(e))
-            LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
-            return False
-
-    def StopRequestListener(self, threadInfo: tuple):
-        try:
-            thread, stop_event = threadInfo
-            stop_event.set()
-            thread.join()
-            print      ("Terminating Analytics backend RequestListener")
-            LOGGER.info("Terminating Analytics backend RequestListener")
-            return True
-        except Exception as e:
-            print       ("Failed to terminate analytics backend {:}".format(e))
-            LOGGER.error("Failed to terminate analytics backend {:}".format(e))
-            return False
-
     def install_servicers(self):
         threading.Thread(target=self.RequestListener, args=()).start()
 
@@ -86,6 +48,7 @@ class AnalyticsBackendService(GenericGrpcService):
         listener for requests on Kafka topic.
         """
         LOGGER.info("Request Listener is initiated ...")
+        # print      ("Request Listener is initiated ...")
         consumer = self.kafka_consumer
         consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
         while True:
@@ -96,34 +59,66 @@ class AnalyticsBackendService(GenericGrpcService):
                 if receive_msg.error().code() == KafkaError._PARTITION_EOF:
                     continue
                 else:
-                    print("Consumer error: {}".format(receive_msg.error()))
+                    LOGGER.error("Consumer error: {:}".format(receive_msg.error()))
+                    # print       ("Consumer error: {:}".format(receive_msg.error()))
                     break
-            analyzer      = json.loads(receive_msg.value().decode('utf-8'))
-            analyzer_uuid = receive_msg.key().decode('utf-8')
-            LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
-            print       ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
+            try:
+                analyzer      = json.loads(receive_msg.value().decode('utf-8'))
+                analyzer_uuid = receive_msg.key().decode('utf-8')
+                LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
+                # print       ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
 
-            if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
-                self.TerminateAnalyzerBackend(analyzer_uuid)
-            else:
-                self.StartSparkStreamer(analyzer_uuid, analyzer)
-        LOGGER.debug("Stop Event activated. Terminating...")
-        print       ("Stop Event activated. Terminating...")
+                if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
+                    self.StopDaskListener(analyzer_uuid)
+                else:
+                    self.StartDaskListener(analyzer_uuid, analyzer)
+            except Exception as e:
+                LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
+                # print         ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
+
+    def StartDaskListener(self, analyzer_uuid, analyzer):
+        kpi_list      = analyzer[ 'input_kpis'   ] 
+        thresholds    = analyzer[ 'thresholds'   ]
+        window_size   = analyzer[ 'window_size'  ]
+        window_slider = analyzer[ 'window_slider']
+
+        LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:}".format(
+            kpi_list, thresholds, window_size, window_slider))
+        # print        ("Received parameters: {:} - {:} - {:} - {:}".format(
+        #     kpi_list, thresholds, window_size, window_slider))
+        try:
+            stop_event = Event()
+            thread     = Thread(
+                target=DaskStreamer,
+                # args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event),
+                args=(analyzer_uuid, kpi_list, thresholds, stop_event),
+                kwargs={
+                    "window_size"       : window_size,
+                }
+            )
+            thread.start()
+            self.running_threads[analyzer_uuid] = (thread, stop_event)
+            # print      ("Initiated Analyzer backend: {:}".format(analyzer_uuid))
+            LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid))
+            return True
+        except Exception as e:
+            # print       ("Failed to initiate Analyzer backend: {:}".format(e))
+            LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
+            return False
 
-    def TerminateAnalyzerBackend(self, analyzer_uuid):
+    def StopDaskListener(self, analyzer_uuid):
         if analyzer_uuid in self.running_threads:
             try:
                 thread, stop_event = self.running_threads[analyzer_uuid]
                 stop_event.set()
                 thread.join()
                 del self.running_threads[analyzer_uuid]
-                print      ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
+                # print      ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
                 LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
                 return True
             except Exception as e:
                 LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: {:}".format(analyzer_uuid, e))
                 return False
         else:
-            print         ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid))
-            LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))           
-            # generate confirmation towards frontend
+            # print         ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid))
+            LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))
diff --git a/src/analytics/backend/service/DaskStreaming.py b/src/analytics/backend/service/DaskStreaming.py
new file mode 100644
index 0000000000000000000000000000000000000000..f09da9949fd0f745f80f782273024f9175db820c
--- /dev/null
+++ b/src/analytics/backend/service/DaskStreaming.py
@@ -0,0 +1,233 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import time
+import json
+from confluent_kafka import Consumer, Producer, KafkaException, KafkaError
+import pandas as pd
+from dask.distributed import Client, LocalCluster
+from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
+
+logging.basicConfig(level=logging.INFO)
+LOGGER = logging.getLogger(__name__)
+
+def SettingKafkaConsumerParams():
+    return {'bootstrap.servers'  : KafkaConfig.get_kafka_address(),
+            'group.id'           : 'analytics-backend',
+            'auto.offset.reset'  : 'latest'}
+
+def GetAggregationMappings(thresholds):
+    agg_dict = {}
+    for threshold_key in thresholds.keys():
+        parts = threshold_key.split('_', 1)
+        if len(parts) != 2:
+            LOGGER.warning(f"Threshold key '{threshold_key}' does not follow the '<aggregation>_<metricName>' format. Skipping.")
+            continue
+        aggregation, metric_name = parts
+        # Ensure that the aggregation function is valid in pandas
+        if aggregation not in ['mean', 'min', 'max', 'first', 'last', 'std']:
+            LOGGER.warning(f"Unsupported aggregation '{aggregation}' in threshold key '{threshold_key}'. Skipping.")
+            continue
+        agg_dict[threshold_key] = ('kpi_value', aggregation)
+    return agg_dict
+
+def ApplyThresholds(aggregated_df, thresholds):
+    """
+    Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary
+    on the aggregated DataFrame.
+    Args:       aggregated_df (pd.DataFrame): DataFrame with aggregated metrics.
+                thresholds (dict): Thresholds dictionary with keys in the format '<aggregation>_<metricName>'.
+    Returns:    pd.DataFrame: DataFrame with additional threshold columns.
+    """
+    for threshold_key, threshold_values in thresholds.items():
+        if threshold_key not in aggregated_df.columns:
+            LOGGER.warning(f"Threshold key '{threshold_key}' does not correspond to any aggregation result. Skipping threshold application.")
+            continue
+        if isinstance(threshold_values, (list, tuple)) and len(threshold_values) == 2:
+            fail_th, raise_th = threshold_values
+            aggregated_df[f"{threshold_key}_THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th
+            aggregated_df[f"{threshold_key}_THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th
+        else:
+            LOGGER.warning(f"Threshold values for '{threshold_key}' are not a list or tuple of length 2. Skipping threshold application.")
+    return aggregated_df
+
+def initialize_dask_client():
+    """
+    Initialize a local Dask cluster and client.
+    """
+    cluster = LocalCluster(n_workers=2, threads_per_worker=2)
+    client = Client(cluster)
+    LOGGER.info(f"Dask Client Initialized: {client}")
+    return client, cluster
+
+def initialize_kafka_producer():
+    return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()})
+
+def delivery_report(err, msg):
+    if err is not None:
+        LOGGER.error(f"Message delivery failed: {err}")
+    else:
+        LOGGER.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
+
+def process_batch(batch, agg_mappings, thresholds):
+    """
+    Process a batch of data and apply thresholds.
+    Args:       batch (list of dict): List of messages from Kafka.
+                agg_mappings (dict): Mapping from threshold key to aggregation function.
+                thresholds (dict): Thresholds dictionary.
+    Returns:    list of dict: Processed records ready to be sent to Kafka.
+    """
+    if not batch:
+        LOGGER.info("Empty batch received. Skipping processing.")
+        return []
+
+    df = pd.DataFrame(batch)
+    df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce')
+    df.dropna(subset=['time_stamp'], inplace=True)
+    required_columns = {'time_stamp', 'kpi_id', 'kpi_value'}
+    if not required_columns.issubset(df.columns):
+        LOGGER.warning(f"Batch contains missing required columns. Required columns: {required_columns}. Skipping batch.")
+        return []
+    if df.empty:
+        LOGGER.info("No data after filtering by KPI IDs. Skipping processing.")
+        return []
+
+    # Perform aggregations using named aggregation
+    try:
+        agg_dict = {key: value for key, value in agg_mappings.items()}
+        df_agg = df.groupby(['window_start', 'kpi_id']).agg(**agg_dict).reset_index()
+    except Exception as e:
+        LOGGER.error(f"Aggregation error: {e}")
+        return []
+
+    # Apply thresholds
+    df_thresholded = ApplyThresholds(df_agg, thresholds)
+    df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
+    # Convert aggregated DataFrame to list of dicts
+    result = df_thresholded.to_dict(orient='records')
+    LOGGER.info(f"Processed batch with {len(result)} records after aggregation and thresholding.")
+
+    return result
+
+def produce_result(result, producer, destination_topic):
+    for record in result:
+        try:
+            producer.produce(
+                destination_topic,
+                key=str(record.get('kpi_id', '')),
+                value=json.dumps(record),
+                callback=delivery_report
+            )
+        except KafkaException as e:
+            LOGGER.error(f"Failed to produce message: {e}")
+    producer.flush()
+    LOGGER.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.")
+
+def DaskStreamer(key, kpi_list, thresholds, stop_event,
+                window_size="30s", time_stamp_col="time_stamp"):
+    client, cluster = initialize_dask_client()
+    consumer_conf   = SettingKafkaConsumerParams()
+    consumer        = Consumer(consumer_conf)
+    consumer.subscribe([KafkaTopic.VALUE.value])
+    producer        = initialize_kafka_producer()
+
+    # Parse window_size to seconds
+    try:
+        window_size_td = pd.to_timedelta(window_size)
+        window_size_seconds = window_size_td.total_seconds()
+    except Exception as e:
+        LOGGER.error(f"Invalid window_size format: {window_size}. Error: {e}")
+        window_size_seconds = 30 
+    LOGGER.info(f"Batch processing interval set to {window_size_seconds} seconds.")
+
+    # Extract aggregation mappings from thresholds
+    agg_mappings = GetAggregationMappings(thresholds)
+    if not agg_mappings:
+        LOGGER.error("No valid aggregation mappings extracted from thresholds. Exiting streamer.")
+        consumer.close()
+        producer.flush()
+        client.close()
+        cluster.close()
+        return
+    try:
+        batch = []
+        last_batch_time = time.time()
+        LOGGER.info("Starting to consume messages...")
+
+        while not stop_event.is_set():
+            msg = consumer.poll(1.0)
+
+            if msg is None:
+                current_time = time.time()
+                if (current_time - last_batch_time) >= window_size_seconds and batch:
+                    LOGGER.info("Time-based batch threshold reached. Processing batch.")
+                    future = client.submit(process_batch, batch, agg_mappings, thresholds)
+                    future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
+                    batch = []
+                    last_batch_time = current_time
+                continue
+
+            if msg.error():
+                if msg.error().code() == KafkaError._PARTITION_EOF:
+                    LOGGER.warning(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
+                else:
+                    LOGGER.error(f"Kafka error: {msg.error()}")
+                continue
+
+            try:
+                message_value = json.loads(msg.value().decode('utf-8'))
+            except json.JSONDecodeError as e:
+                LOGGER.error(f"JSON decode error: {e}")
+                continue
+
+            try:
+                message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce')
+                if pd.isna(message_timestamp):
+                    LOGGER.warning(f"Invalid timestamp in message: {message_value}. Skipping message.")
+                    continue
+                window_start = message_timestamp.floor(window_size)
+                message_value['window_start'] = window_start
+            except Exception as e:
+                LOGGER.error(f"Error processing timestamp: {e}. Skipping message.")
+                continue
+
+            if message_value['kpi_id'] not in kpi_list:
+                LOGGER.debug(f"KPI ID '{message_value['kpi_id']}' not in kpi_list. Skipping message.")
+                continue
+
+            batch.append(message_value)
+
+            current_time = time.time()
+            if (current_time - last_batch_time) >= window_size_seconds and batch:
+                LOGGER.info("Time-based batch threshold reached. Processing batch.")
+                future = client.submit(process_batch, batch, agg_mappings, thresholds)
+                future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
+                batch = []
+                last_batch_time = current_time
+
+    except Exception as e:
+        LOGGER.exception(f"Error in Dask streaming process: {e}")
+    finally:
+        # Process any remaining messages in the batch
+        if batch:
+            LOGGER.info("Processing remaining messages in the batch.")
+            future = client.submit(process_batch, batch, agg_mappings, thresholds)
+            future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
+        consumer.close()
+        producer.flush()
+        LOGGER.info("Kafka consumer and producer closed.")
+        client.close()
+        cluster.close()
+        LOGGER.info("Dask client and cluster closed.")
diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py
index c3b78967efe13eef9a60e19e50e56bdfca4a410d..cdc6c34428a72e5fcf90db3c5656a33c2bb29008 100644
--- a/src/analytics/backend/tests/messages.py
+++ b/src/analytics/backend/tests/messages.py
@@ -16,7 +16,7 @@ import uuid
 import json
 from common.proto.kpi_manager_pb2        import KpiId
 from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode,
-                                                Analyzer )
+                                                Analyzer, AnalyzerId )
 
 def get_kpi_id_list():
     return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"]
@@ -38,18 +38,25 @@ def get_threshold_dict():
         op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict
     }
 
+def create_analyzer_id():
+    _create_analyzer_id                  = AnalyzerId()
+    # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
+    # _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
+    _create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
+    return _create_analyzer_id
+
 
 def create_analyzer():
     _create_analyzer                              = Analyzer()
     # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
-    _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
+    _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
     _create_analyzer.algorithm_name               = "Test_Aggergate_and_Threshold"
     _create_analyzer.operation_mode               = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
     
     _kpi_id = KpiId()
     # input IDs to analyze
     _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
-    _kpi_id.kpi_id.uuid              = "6e22f180-ba28-4641-b190-2287bf448888"
+    _kpi_id.kpi_id.uuid              = "6e22f180-ba28-4641-b190-2287bf448888" 
     _create_analyzer.input_kpi_ids.append(_kpi_id)
     _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
     _kpi_id.kpi_id.uuid              = "1e22f180-ba28-4641-b190-2287bf446666"
@@ -70,4 +77,38 @@ def create_analyzer():
     _create_analyzer.parameters['window_slider']   = "30 seconds"     # should be less than window size
     _create_analyzer.parameters['store_aggregate'] = str(False)       # TRUE to store. No implemented yet
 
-    return _create_analyzer
\ No newline at end of file
+    return _create_analyzer
+
+def create_analyzer_dask():
+    _create_analyzer                              = Analyzer()
+    _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
+    # _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
+    _create_analyzer.algorithm_name               = "Test_Aggergate_and_Threshold"
+    _create_analyzer.operation_mode               = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
+    
+    _kpi_id = KpiId()
+    # input IDs to analyze
+    # _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
+    _kpi_id.kpi_id.uuid              = "6e22f180-ba28-4641-b190-2287bf448888" 
+    _create_analyzer.input_kpi_ids.append(_kpi_id)
+    # _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
+    _kpi_id.kpi_id.uuid              = "1e22f180-ba28-4641-b190-2287bf446666"
+    _create_analyzer.input_kpi_ids.append(_kpi_id)
+    # _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
+    _create_analyzer.input_kpi_ids.append(_kpi_id)
+    # output IDs after analysis
+    _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
+    _create_analyzer.output_kpi_ids.append(_kpi_id)
+    _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
+    _create_analyzer.output_kpi_ids.append(_kpi_id)
+    # parameter
+
+    _threshold_dict = {
+        'mean_latency'  :(20, 30),  'min_latency'   :(00, 10),  'max_latency' :(45, 50),#}
+        'first_value'   :(00, 50),  'last_value'    :(50, 100), 'std_value' :(0, 90)}
+    _create_analyzer.parameters['thresholds']      = json.dumps(_threshold_dict)
+    _create_analyzer.parameters['oper_list']       = json.dumps([key.split('_')[0] for key in _threshold_dict.keys()])
+    _create_analyzer.parameters['window_size']     = "10s"     # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" 
+    _create_analyzer.parameters['window_slider']   = "5s"     # should be less than window size
+    _create_analyzer.parameters['store_aggregate'] = str(False)       # TRUE to store. No implemented yet
+    return _create_analyzer
diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py
index bc0f47eb213f705a71f46de6a45d38b1c6f37e96..86de220a21b4c2c1c38d518c01ae13f33ee200d5 100644
--- a/src/analytics/backend/tests/test_backend.py
+++ b/src/analytics/backend/tests/test_backend.py
@@ -19,7 +19,9 @@ from threading import Event, Thread
 from common.tools.kafka.Variables import KafkaTopic
 from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
 from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict
-from .messages import create_analyzer
+from .messages import create_analyzer, create_analyzer_dask
+from threading import Thread, Event
+from ..service.DaskStreaming import DaskStreamer
 
 LOGGER = logging.getLogger(__name__)
 
@@ -34,52 +36,103 @@ def test_validate_kafka_topics():
     response = KafkaTopic.create_all_topics()
     assert isinstance(response, bool)
 
-def test_StartSparkStreamer():
-    LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ")
-    analyzer_obj = create_analyzer()
-    analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
-    analyzer_to_generate : Dict = {
-        "algo_name"       : analyzer_obj.algorithm_name,
-        "input_kpis"      : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
-        "output_kpis"     : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
-        "oper_mode"       : analyzer_obj.operation_mode,
-        "thresholds"      : json.loads(analyzer_obj.parameters["thresholds"]),
-        "window_size"     : analyzer_obj.parameters["window_size"],
-        "window_slider"   : analyzer_obj.parameters["window_slider"],
-        # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] 
-    }
-    AnalyticsBackendServiceObj = AnalyticsBackendService()
-    response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate)
-    assert isinstance(response, bool)
 
-# def test_StartRequestListener():
-#     LOGGER.info('test_RunRequestListener')
+# --- To test Dask Streamer functionality ---
+# def test_StartDaskStreamer():   # Directly from the Streamer class
+#     LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ")
+#     stop_event = Event()
+#     kpi_list = ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3']
+#     oper_list = ['avg', 'min', 'max',]
+#     thresholds = {
+#         'avg_value': (10.0, 90.0),
+#         'min_value': (5.0, 95.0),
+#         'max_value': (15.0, 85.0),
+#         'latency'  : (2.0, 10.0)
+#     }
+
+#     # Start the DaskStreamer in a separate thread
+#     streamer_thread = Thread(
+#         target=DaskStreamer,
+#         args=("analytics_stream", kpi_list, oper_list, thresholds, stop_event),
+#         kwargs={
+#             "window_size": "60s",
+#             "win_slide_duration": "30s",
+#             "time_stamp_col": "time_stamp"
+#         }
+#     )
+#     streamer_thread.start()
+#     try:
+#         while True:
+#             time.sleep(10)
+#     except KeyboardInterrupt:
+#         LOGGER.info("KeyboardInterrupt received. Stopping streamer...")
+#         stop_event.set()
+#         streamer_thread.join()
+#         LOGGER.info("Streamer stopped gracefully.")
+
+# --- To test Start Streamer functionality ---
+# def test_StartDaskStreamer():
+#     LOGGER.debug(" >>> test_StartBaskStreamer: START <<< ")
+#     analyzer_obj = create_analyzer_dask()
+#     # LOGGER.info("Created Analyzer Object: {:}".format(analyzer_obj))
+#     analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
+#     analyzer_to_generate : Dict = {
+#         "algo_name"       : analyzer_obj.algorithm_name,
+#         "input_kpis"      : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
+#         "output_kpis"     : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
+#         "oper_mode"       : analyzer_obj.operation_mode,
+#         "thresholds"      : json.loads(analyzer_obj.parameters["thresholds"]),
+#         "oper_list"       : json.loads(analyzer_obj.parameters["oper_list"]),
+#         # "oper_list"       : analyzer_obj.parameters["oper_list"],
+#         "window_size"     : analyzer_obj.parameters["window_size"],
+#         "window_slider"   : analyzer_obj.parameters["window_slider"],
+#         # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] 
+#     }
 #     AnalyticsBackendServiceObj = AnalyticsBackendService()
-#     response = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
+#     LOGGER.info("Analyzer to be generated: {:}".format((analyzer_to_generate)))
+#     response = AnalyticsBackendServiceObj.StartDaskListener(analyzer_uuid, analyzer_to_generate)
+#     assert isinstance(response, bool)
+#     time.sleep(100)
+#     LOGGER.info('Initiating StopRequestListener...')
+#     # AnalyticsBackendServiceObj = AnalyticsBackendService()
+#     response = AnalyticsBackendServiceObj.StopDaskListener(analyzer_uuid)
 #     LOGGER.debug(str(response)) 
-#     assert isinstance(response, tuple)
+#     assert isinstance(response, bool)
 
-# To test START and STOP communication together
-def test_StopRequestListener():
-    LOGGER.info('test_RunRequestListener')
-    LOGGER.info('Initiating StartRequestListener...')
-    AnalyticsBackendServiceObj = AnalyticsBackendService()
-    response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
-    # LOGGER.debug(str(response_thread))
-    time.sleep(10)
-    LOGGER.info('Initiating StopRequestListener...')
-    AnalyticsBackendServiceObj = AnalyticsBackendService()
-    AnalyticsBackendServiceObj.stop_event = Event()
-    listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=())
-    listener_thread.start()
+# --- To test Start Streamer functionality ---
+# def test_StartSparkStreamer():
+#     LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ")
+#     analyzer_obj = create_analyzer()
+#     analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
+#     analyzer_to_generate : Dict = {
+#         "algo_name"       : analyzer_obj.algorithm_name,
+#         "input_kpis"      : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
+#         "output_kpis"     : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
+#         "oper_mode"       : analyzer_obj.operation_mode,
+#         "thresholds"      : json.loads(analyzer_obj.parameters["thresholds"]),
+#         "window_size"     : analyzer_obj.parameters["window_size"],
+#         "window_slider"   : analyzer_obj.parameters["window_slider"],
+#         # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] 
+#     }
+#     AnalyticsBackendServiceObj = AnalyticsBackendService()
+#     response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate)
+#     assert isinstance(response, bool)
+
+# --- To TEST StartRequestListenerFunctionality
+# def test_StartRequestListener():
+#     LOGGER.info('test_RunRequestListener')
+#     AnalyticsBackendServiceObj = AnalyticsBackendService()
+#     AnalyticsBackendServiceObj.stop_event = Event()
+#     listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=())
+#     listener_thread.start()
 
-    time.sleep(2000)
+#     time.sleep(100)
 
     # AnalyticsBackendServiceObj.stop_event.set()
     # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds')
     # listener_thread.join(timeout=10)
     # assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected."
-    LOGGER.info('Completed test_RunRequestListener')
+    # LOGGER.info('Completed test_RunRequestListener')
 
 # To test START and STOP communication together
 # def test_StopRequestListener():
diff --git a/src/analytics/frontend/requirements.in b/src/analytics/frontend/requirements.in
index d81b9ddbeafeff94c830d48ca5594e775b9ce240..0b1ec921b8bb77c0d26e8240585a19ef165f0eec 100644
--- a/src/analytics/frontend/requirements.in
+++ b/src/analytics/frontend/requirements.in
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-apscheduler==3.10.4
+apscheduler==3.10.1
 confluent-kafka==2.3.*
 psycopg2-binary==2.9.*
 SQLAlchemy==1.4.*
diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py
index 9ffacecc30fac40bb4899b8889386bc23a7609ac..323113bb0d8234f41961d05a049986296167b96b 100644
--- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py
+++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py
@@ -12,10 +12,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging, grpc, json
+import logging, grpc, json, queue
 
 from typing          import Dict
 from confluent_kafka import Producer as KafkaProducer
+from confluent_kafka import KafkaError
 
 from common.tools.kafka.Variables             import KafkaConfig, KafkaTopic
 from common.proto.context_pb2                 import Empty
@@ -24,6 +25,8 @@ from common.proto.analytics_frontend_pb2      import Analyzer, AnalyzerId, Analy
 from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer
 from analytics.database.Analyzer_DB           import AnalyzerDB
 from analytics.database.AnalyzerModel         import Analyzer as AnalyzerModel
+from apscheduler.schedulers.background        import BackgroundScheduler
+from apscheduler.triggers.interval            import IntervalTrigger
 
 
 LOGGER           = logging.getLogger(__name__)
@@ -94,10 +97,8 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
         LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...")
         try:
             while True:
-                LOGGER.info("entering while...")
-                key, value = self.result_queue.get()  # Wait until a result is available
-                LOGGER.info("In while true ...")
-                yield key, value  # Yield the result to the calling function
+                key, value = self.result_queue.get()
+                yield key, value
         except KeyboardInterrupt:
             LOGGER.warning("Listener stopped manually.")
         finally:
@@ -127,8 +128,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
                 self.result_queue.put((key, value))
             else:
                 LOGGER.info(f"Skipping message with unmatched key: {key}")
-                # value = json.loads(msg.value().decode('utf-8')) # Added for debugging
-                # self.result_queue.put((filter_key, value))             # Added for debugging
         except Exception as e:
             LOGGER.error(f"Error processing Kafka message: {e}")
 
@@ -189,7 +188,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
     def delivery_callback(self, err, msg):
         if err:
             LOGGER.debug('Message delivery failed: {:}'.format(err))
-            print       ('Message delivery failed: {:}'.format(err))
+            # print       ('Message delivery failed: {:}'.format(err))
         else:
             LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
-            print('Message delivered to topic {:}'.format(msg.topic()))
+            # print('Message delivered to topic {:}'.format(msg.topic()))
diff --git a/src/analytics/frontend/tests/messages.py b/src/analytics/frontend/tests/messages.py
index 646de962e8a213582fdb7cd1446ab57bda561a96..e2d39585e434b58c0d48d0061e105a5ebaabe6b9 100644
--- a/src/analytics/frontend/tests/messages.py
+++ b/src/analytics/frontend/tests/messages.py
@@ -21,13 +21,14 @@ from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, Analyze
 def create_analyzer_id():
     _create_analyzer_id                  = AnalyzerId()
     # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
-    _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
+    # _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
+    _create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
     return _create_analyzer_id
 
 def create_analyzer():
     _create_analyzer                              = Analyzer()
     # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
-    _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
+    _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
     _create_analyzer.algorithm_name               = "Test_Aggergate_and_Threshold"
     _create_analyzer.operation_mode               = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
     
@@ -48,11 +49,12 @@ def create_analyzer():
     _create_analyzer.output_kpi_ids.append(_kpi_id)
     # parameter
     _threshold_dict = {
-        # 'avg_value'   :(20, 30), 'min_value'   :(00, 10), 'max_value'   :(45, 50),
-        'first_value' :(00, 10), 'last_value'  :(40, 50), 'stdev_value':(00, 10)}
+        'mean_value'   :(20, 30), 'min_value'   :(00, 10), 'max_value'   :(45, 50),
+        'first_value' :(00, 10), 'last_value'  :(40, 50), 'std_value':(00, 10)
+        }
     _create_analyzer.parameters['thresholds']      = json.dumps(_threshold_dict)
-    _create_analyzer.parameters['window_size']     = "60 seconds"     # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" 
-    _create_analyzer.parameters['window_slider']   = "30 seconds"     # should be less than window size
+    _create_analyzer.parameters['window_size']     = "10s"            # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" 
+    _create_analyzer.parameters['window_slider']   = "5s"             # should be less than window size
     _create_analyzer.parameters['store_aggregate'] = str(False)       # TRUE to store. No implemented yet
 
     return _create_analyzer
diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py
index 4583a45ac12acde9da81c4b7a15165db99fb38bb..526c32eb8af98afde6b89e784f62f0a2d0f7f432 100644
--- a/src/analytics/frontend/tests/test_frontend.py
+++ b/src/analytics/frontend/tests/test_frontend.py
@@ -26,7 +26,6 @@ from common.Settings          import ( get_service_port_grpc, get_env_var_name,
 
 from common.tools.kafka.Variables                        import KafkaTopic
 from common.proto.kpi_value_api_pb2                      import KpiValue
-from common.proto.analytics_frontend_pb2                 import AnalyzerAlarms
 from analytics.frontend.client.AnalyticsFrontendClient   import AnalyticsFrontendClient
 from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService
 from analytics.frontend.tests.messages                   import ( create_analyzer_id, create_analyzer,
@@ -34,7 +33,7 @@ from analytics.frontend.tests.messages                   import ( create_analyze
 from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl
 from apscheduler.schedulers.background                   import BackgroundScheduler
 from apscheduler.triggers.interval                       import IntervalTrigger
-
+from common.proto.analytics_frontend_pb2                 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList
 
 ###########################
 # Tests Setup
@@ -85,23 +84,21 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic
 ###########################
 
 # --- "test_validate_kafka_topics" should be executed before the functionality tests ---
-# def test_validate_kafka_topics():
-#     LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
-#     response = KafkaTopic.create_all_topics()
-#     assert isinstance(response, bool)
-
-# # ----- core funtionality test -----
-def test_StartAnalytics(analyticsFrontend_client):
-    LOGGER.info(' >>> test_StartAnalytic START: <<< ')
-    stream = analyticsFrontend_client.StartAnalyzer(create_analyzer())
-    for response in stream:
-        LOGGER.debug(str(response))
-    assert isinstance(response, KpiValue)
+def test_validate_kafka_topics():
+    LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
+    response = KafkaTopic.create_all_topics()
+    assert isinstance(response, bool)
+
+# ----- core funtionality test -----
+# def test_StartAnalytics(analyticsFrontend_client):
+#     LOGGER.info(' >>> test_StartAnalytic START: <<< ')
+#     response = analyticsFrontend_client.StartAnalyzer(create_analyzer())
+#     LOGGER.debug(str(response))
+#     assert isinstance(response, AnalyzerId)
 
 # To test start and stop listener together
-def test_StartStopAnalyzers(analyticsFrontend_client):
-    LOGGER.info(' >>> test_StartStopAnalyzers START: <<< ')
-    LOGGER.info('--> StartAnalyzer')
+def test_StartAnalyzers(analyticsFrontend_client):
+    LOGGER.info(' >>> test_StartAnalyzers START: <<< ')
     added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer())
     LOGGER.debug(str(added_analyzer_id))
     assert isinstance(added_analyzer_id, AnalyzerId)
diff --git a/src/analytics/requirements.in b/src/analytics/requirements.in
index 8ff30ddaad25c39713f2e6f68c8d9aebed74dad0..231dc04e820387c95ffea72cbe67b9f0a9a0865a 100644
--- a/src/analytics/requirements.in
+++ b/src/analytics/requirements.in
@@ -12,8 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-java==11.0.*
-pyspark==3.5.2
 confluent-kafka==2.3.*
 psycopg2-binary==2.9.*
 SQLAlchemy==1.4.*
diff --git a/src/analytics/tests/test_analytics_db.py b/src/analytics/tests/test_analytics_db.py
index 58e7d0167044bb461e66b053dcb3999641ea8419..2794edb4a051b38d4cef902fd09aaad5db966179 100644
--- a/src/analytics/tests/test_analytics_db.py
+++ b/src/analytics/tests/test_analytics_db.py
@@ -15,12 +15,13 @@
 
 import logging
 from analytics.database.Analyzer_DB import AnalyzerDB
+from analytics.database.AnalyzerModel import Analyzer
 
 LOGGER = logging.getLogger(__name__)
 
 def test_verify_databases_and_tables():
     LOGGER.info('>>> test_verify_databases_and_tables : START <<< ')
-    AnalyzerDBobj = AnalyzerDB()
+    AnalyzerDBobj = AnalyzerDB(Analyzer)
     # AnalyzerDBobj.drop_database()
     # AnalyzerDBobj.verify_tables()
     AnalyzerDBobj.create_database()
diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py
index cadeec6ed331f599411d6480769985673f8d584d..d1acff7e6dbad64086064a7e2344914cbba114f1 100644
--- a/src/common/tools/kafka/Variables.py
+++ b/src/common/tools/kafka/Variables.py
@@ -90,4 +90,4 @@ class KafkaTopic(Enum):
                 return False
         return True
 
-# create all topics after the deployments (Telemetry and Analytics)
+# TODO: create all topics after the deployments (Telemetry and Analytics)
diff --git a/src/kpi_manager/tests/test_messages.py b/src/kpi_manager/tests/test_messages.py
index 7b5c45859b6c10056211f9f33df950d9668c11ea..08a2dbf7334c3e4e68c3cfa6c27ce08532521342 100644
--- a/src/kpi_manager/tests/test_messages.py
+++ b/src/kpi_manager/tests/test_messages.py
@@ -27,6 +27,8 @@ def create_kpi_id_request():
 def create_kpi_descriptor_request(descriptor_name: str = "Test_name"):
     _create_kpi_request                                    = kpi_manager_pb2.KpiDescriptor()
     _create_kpi_request.kpi_id.kpi_id.uuid                 = str(uuid.uuid4())
+    # _create_kpi_request.kpi_id.kpi_id.uuid                 = "6e22f180-ba28-4641-b190-2287bf448888"
+    # _create_kpi_request.kpi_id.kpi_id.uuid                 = "1e22f180-ba28-4641-b190-2287bf446666"
     _create_kpi_request.kpi_description                    = descriptor_name
     _create_kpi_request.kpi_sample_type                    = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
     _create_kpi_request.device_id.device_uuid.uuid         = 'DEV2' 
diff --git a/src/kpi_value_api/.gitlab-ci.yml b/src/kpi_value_api/.gitlab-ci.yml
index 3db0538dd071e72a5b66f8489036ff83245ce337..14c8df299b1a4970ec0a4733bcd918bf1485b00d 100644
--- a/src/kpi_value_api/.gitlab-ci.yml
+++ b/src/kpi_value_api/.gitlab-ci.yml
@@ -62,7 +62,8 @@ unit_test kpi-value-api:
       --env ALLOW_ANONYMOUS_LOGIN=yes
       bitnami/zookeeper:latest
     - sleep 10 # Wait for Zookeeper to start
-    - docker run --name kafka -d --network=teraflowbridge -p 9092:9092
+    - >
+      docker run --name kafka -d --network=teraflowbridge -p 9092:9092
       --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
       --env ALLOW_PLAINTEXT_LISTENER=yes
       bitnami/kafka:latest
diff --git a/src/kpi_value_api/requirements.in b/src/kpi_value_api/requirements.in
index e95d6d8bbd81abca7eb2622e1faf6af473fcdb12..0615fa833f255bf91fd72fc484e40842face7a44 100644
--- a/src/kpi_value_api/requirements.in
+++ b/src/kpi_value_api/requirements.in
@@ -15,4 +15,4 @@
 confluent-kafka==2.3.*
 requests==2.27.*
 prometheus-api-client==0.5.3
-apscheduler==3.10.4
+apscheduler==3.10.1
diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py
index 0f57f88219a74108a555cf87e9bdb98999fd5da2..706e180d5cf65107f5899315cdf75867beb81608 100644
--- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py
+++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py
@@ -46,7 +46,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
         self.scheduler      = BackgroundScheduler()
         self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
         self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
-                                            'group.id'           : 'analytics-frontend',
+                                            'group.id'           : 'kpi-value-api-frontend',
                                             'auto.offset.reset'  : 'latest'})
         
     @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
@@ -152,12 +152,10 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
 
         for alarm_key, value in self.StartResponseListener(request.kpi_id.uuid):
             response.start_timestamp.timestamp = datetime.strptime(
-                value["window"]["start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
-            response.end_timestamp.timestamp = datetime.strptime(
-                value["window"]["end"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
+                value["window_start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
             response.kpi_id.kpi_id.uuid  = value['kpi_id']
             for key, threshold in value.items():
-                if "THRESHOLD_" in key:
+                if key not in ['kpi_id', 'window']:
                     response.alarms[key] = threshold
 
             yield response
@@ -183,10 +181,10 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
                 key, value = self.result_queue.get()  # Wait until a result is available
                 LOGGER.info("In while true ...")
                 yield key, value  # Yield the result to the calling function
-        except KeyboardInterrupt:
-            LOGGER.warning("Listener stopped manually.")
+        except Exception as e:
+            LOGGER.warning("Listener stopped. Error: {:}".format(e))
         finally:
-            self.StopListener()
+            self.scheduler.shutdown()
 
     def response_listener(self, filter_key=None):
         """
@@ -196,23 +194,24 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
 
         consumer = self.kafka_consumer
         consumer.subscribe([self.listener_topic])
-        msg = consumer.poll(2.0)
-        if msg is None:
-            return
-        elif msg.error():
-            if msg.error().code() != KafkaError._PARTITION_EOF:
-                LOGGER.error(f"Kafka error: {msg.error()}")
-            return
-        try:
-            key = msg.key().decode('utf-8') if msg.key() else None
-            if filter_key is not None and key == filter_key:
-                value = json.loads(msg.value().decode('utf-8'))
-                LOGGER.info(f"Received key: {key}, value: {value}")
-                self.result_queue.put((key, value))
-            else:
-                LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}")
-        except Exception as e:
-            LOGGER.error(f"Error processing Kafka message: {e}")
+        while True:
+            msg = consumer.poll(1.0)
+            if msg is None:
+                continue
+            elif msg.error():
+                if msg.error().code() != KafkaError._PARTITION_EOF:
+                    LOGGER.error(f"Kafka error: {msg.error()}")
+                break
+            try:
+                key = msg.key().decode('utf-8') if msg.key() else None
+                if filter_key is not None and key == filter_key:
+                    value = json.loads(msg.value().decode('utf-8'))
+                    LOGGER.info(f"Received key: {key}, value: {value}")
+                    self.result_queue.put((key, value))
+                else:
+                    LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}")
+            except Exception as e:
+                LOGGER.error(f"Error processing Kafka message: {e}")
 
     def delivery_callback(self, err, msg):
         if err: LOGGER.debug('Message delivery failed: {:}'.format(err))
diff --git a/src/kpi_value_api/tests/test_kpi_value_api.py b/src/kpi_value_api/tests/test_kpi_value_api.py
index ac17f6f987d437ee6dacd7dfdc7a1de7a8965343..c245bb9ef64eaa29dc4d51955ff94adeeeeb8dda 100644
--- a/src/kpi_value_api/tests/test_kpi_value_api.py
+++ b/src/kpi_value_api/tests/test_kpi_value_api.py
@@ -78,12 +78,12 @@ def test_validate_kafka_topics():
     response = KafkaTopic.create_all_topics()
     assert isinstance(response, bool)
 
-def test_GetKpiAlarms(kpi_value_api_client):
-    LOGGER.debug(" >>> test_GetKpiAlarms")
-    stream = kpi_value_api_client.GetKpiAlarms(create_kpi_id_request())
-    for response in stream:
-        LOGGER.debug(str(response))
-    assert isinstance(response, KpiAlarms)
+# def test_GetKpiAlarms(kpi_value_api_client):
+#     LOGGER.debug(" >>> test_GetKpiAlarms")
+#     stream = kpi_value_api_client.GetKpiAlarms(create_kpi_id_request())
+#     for response in stream:
+#         LOGGER.debug(str(response))
+#     assert isinstance(response, KpiAlarms)
 
 # def test_store_kpi_values(kpi_value_api_client):
 #     LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py
index 078fa5896d5fb5033833e0e2ef2248613ef80c18..79a35d343860d19992518c0e8b29e427e5cbbef4 100755
--- a/src/telemetry/backend/service/TelemetryBackendService.py
+++ b/src/telemetry/backend/service/TelemetryBackendService.py
@@ -17,7 +17,8 @@ import time
 import random
 import logging
 import threading
-from typing import Any, Dict
+from typing   import Any, Dict
+from datetime import datetime, timezone
 # from common.proto.context_pb2 import Empty
 from confluent_kafka import Producer as KafkaProducer
 from confluent_kafka import Consumer as KafkaConsumer
@@ -53,6 +54,8 @@ class TelemetryBackendService(GenericGrpcService):
         """
         listener for requests on Kafka topic.
         """
+        LOGGER.info('Telemetry backend request listener is running ...')
+        # print      ('Telemetry backend request listener is running ...')
         consumer = self.kafka_consumer
         consumer.subscribe([KafkaTopic.REQUEST.value])
         while True:
@@ -63,29 +66,33 @@ class TelemetryBackendService(GenericGrpcService):
                 if receive_msg.error().code() == KafkaError._PARTITION_EOF:
                     continue
                 else:
-                    print("Consumer error: {}".format(receive_msg.error()))
+                    # print("Consumer error: {}".format(receive_msg.error()))
                     break
-            
-            collector = json.loads(receive_msg.value().decode('utf-8'))
-            collector_id = receive_msg.key().decode('utf-8')
-            LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector))
-            print('Recevied Collector: {:} - {:}'.format(collector_id, collector))
+            try: 
+                collector = json.loads(receive_msg.value().decode('utf-8'))
+                collector_id = receive_msg.key().decode('utf-8')
+                LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector))
+                # print('Recevied Collector: {:} - {:}'.format(collector_id, collector))
 
-            if collector['duration'] == -1 and collector['interval'] == -1:
-                self.TerminateCollectorBackend(collector_id)
-            else:
-                self.RunInitiateCollectorBackend(collector_id, collector)
+                if collector['duration'] == -1 and collector['interval'] == -1:
+                    self.TerminateCollectorBackend(collector_id)
+                else:
+                    self.RunInitiateCollectorBackend(collector_id, collector)
+            except Exception as e:
+                LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e))
+                # print         ("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e))
 
     def TerminateCollectorBackend(self, collector_id):
         if collector_id in self.running_threads:
             thread, stop_event = self.running_threads[collector_id]
             stop_event.set()
             thread.join()
-            print ("Terminating backend (by StopCollector): Collector Id: ", collector_id)
+            # print ("Terminating backend (by StopCollector): Collector Id: ", collector_id)
             del self.running_threads[collector_id]
             self.GenerateCollectorTerminationSignal(collector_id, "-1", -1)          # Termination confirmation to frontend.
         else:
-            print ('Backend collector {:} not found'.format(collector_id))
+            # print ('Backend collector {:} not found'.format(collector_id))
+            LOGGER.warning('Backend collector {:} not found'.format(collector_id))
 
     def RunInitiateCollectorBackend(self, collector_id: str, collector: str):
         stop_event = threading.Event()
@@ -98,10 +105,11 @@ class TelemetryBackendService(GenericGrpcService):
         """
         Method receives collector request and initiates collecter backend.
         """
-        print("Initiating backend for collector: ", collector_id)
+        # print("Initiating backend for collector: ", collector_id)
+        LOGGER.info("Initiating backend for collector: ", collector_id)
         start_time = time.time()
         while not stop_event.is_set():
-            if time.time() - start_time >= collector['duration']:            # condition to terminate backend
+            if int(collector['duration']) != -1 and time.time() - start_time >= collector['duration']:            # condition to terminate backend
                 print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time)
                 self.GenerateCollectorTerminationSignal(collector_id, "-1", -1)       # Termination confirmation to frontend.
                 break
@@ -130,8 +138,7 @@ class TelemetryBackendService(GenericGrpcService):
         Method to extract kpi value.
         """
         measured_kpi_value = random.randint(1,100)                      # TODO: To be extracted from a device
-        print ("Measured Kpi value: {:}".format(measured_kpi_value))
-        # measured_kpi_value = self.fetch_node_exporter_metrics()       # exporter extracted metric value against default KPI
+        # print ("Measured Kpi value: {:}".format(measured_kpi_value))
         self.GenerateCollectorResponse(collector_id, kpi_id , measured_kpi_value)
 
     def GenerateCollectorResponse(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
@@ -140,7 +147,7 @@ class TelemetryBackendService(GenericGrpcService):
         """
         producer = self.kafka_producer
         kpi_value : Dict = {
-            "time_stamp": str(time.time()),
+            "time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
             "kpi_id"    : kpi_id,
             "kpi_value" : measured_kpi_value
         }
@@ -160,7 +167,7 @@ class TelemetryBackendService(GenericGrpcService):
         """
         if err: 
             LOGGER.debug('Message delivery failed: {:}'.format(err))
-            print(f'Message delivery failed: {err}')
+            # print(f'Message delivery failed: {err}')
         else:
             LOGGER.info('Message delivered to topic {:}'.format(msg.topic()))
-            print(f'Message delivered to topic {msg.topic()}')
+            # print(f'Message delivered to topic {msg.topic()}')
diff --git a/src/telemetry/backend/tests/messagesBackend.py b/src/telemetry/backend/tests/messages.py
similarity index 100%
rename from src/telemetry/backend/tests/messagesBackend.py
rename to src/telemetry/backend/tests/messages.py
diff --git a/src/telemetry/backend/tests/test_TelemetryBackend.py b/src/telemetry/backend/tests/test_backend.py
similarity index 69%
rename from src/telemetry/backend/tests/test_TelemetryBackend.py
rename to src/telemetry/backend/tests/test_backend.py
index 665fa825e3ee31b2e92351d9c5855f627ce40fa1..8bbde9769ae1dfb16a33ef528f74031d2ba94c01 100644
--- a/src/telemetry/backend/tests/test_TelemetryBackend.py
+++ b/src/telemetry/backend/tests/test_backend.py
@@ -26,13 +26,12 @@ LOGGER = logging.getLogger(__name__)
 ###########################
 
 # --- "test_validate_kafka_topics" should be run before the functionality tests ---
-# def test_validate_kafka_topics():
-#     LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
-#     response = KafkaTopic.create_all_topics()
-#     assert isinstance(response, bool)
+def test_validate_kafka_topics():
+    LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
+    response = KafkaTopic.create_all_topics()
+    assert isinstance(response, bool)
 
-def test_RunRequestListener():
-    LOGGER.info('test_RunRequestListener')
-    TelemetryBackendServiceObj = TelemetryBackendService()
-    response = threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start()
-    LOGGER.debug(str(response))
+# def test_RunRequestListener():
+#     LOGGER.info('test_RunRequestListener')
+#     TelemetryBackendServiceObj = TelemetryBackendService()
+#     threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start()
\ No newline at end of file
diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py
index ad99dff12dc641232972f8cff8226878caefd71b..5c569e2ddd1d75dd89f88fe9ae08517330470254 100644
--- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py
+++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py
@@ -153,7 +153,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
         """
         if err:
             LOGGER.debug('Message delivery failed: {:}'.format(err))
-            print('Message delivery failed: {:}'.format(err))
+            # print('Message delivery failed: {:}'.format(err))
         # else:
         #     LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
         #     print('Message delivered to topic {:}'.format(msg.topic()))
@@ -177,7 +177,8 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
                 if receive_msg.error().code() == KafkaError._PARTITION_EOF:
                     continue
                 else:
-                    print("Consumer error: {}".format(receive_msg.error()))
+                    # print("Consumer error: {:}".format(receive_msg.error()))
+                    LOGGER.error("Consumer error: {:}".format(receive_msg.error()))
                     break
             try:
                 collector_id = receive_msg.key().decode('utf-8')
@@ -185,13 +186,17 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
                     kpi_value = json.loads(receive_msg.value().decode('utf-8'))
                     self.process_response(collector_id, kpi_value['kpi_id'], kpi_value['kpi_value'])
                 else:
-                    print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ")
+                    # print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ")
+                    LOGGER.info("collector id does not match.\nRespone ID: {:} --- Active IDs: {:}".format(collector_id, ACTIVE_COLLECTORS))
             except Exception as e:
-                print(f"Error extarcting msg key or value: {str(e)}")
+                # print(f"Error extarcting msg key or value: {str(e)}")
+                LOGGER.info("Error extarcting msg key or value: {:}".format(e))
                 continue
 
     def process_response(self, collector_id: str, kpi_id: str, kpi_value: Any):
         if kpi_id == "-1" and kpi_value == -1:
-            print ("Backend termination confirmation for collector id: ", collector_id)
+            # print ("Backend termination confirmation for collector id: ", collector_id)
+            LOGGER.info("Backend termination confirmation for collector id: ", collector_id)
         else:
-            print ("KPI Value: Collector Id:", collector_id, ", Kpi Id:", kpi_id, ", Value:", kpi_value)
+            LOGGER.info("Backend termination confirmation for collector id: ", collector_id)
+            # print ("KPI Value: Collector Id:", collector_id, ", Kpi Id:", kpi_id, ", Value:", kpi_value)
diff --git a/src/telemetry/frontend/tests/Messages.py b/src/telemetry/frontend/tests/Messages.py
index a0e93e8a121b9efaac83f7169419911c8ee6e3ea..e6d8ef439f4ad4764c5a6f8b5f36ec68cbb10867 100644
--- a/src/telemetry/frontend/tests/Messages.py
+++ b/src/telemetry/frontend/tests/Messages.py
@@ -22,15 +22,18 @@ from common.proto.kpi_manager_pb2 import KpiId
 def create_collector_id():
     _collector_id                   = telemetry_frontend_pb2.CollectorId()
     # _collector_id.collector_id.uuid = str(uuid.uuid4())
-    _collector_id.collector_id.uuid = "5d45f53f-d567-429f-9427-9196ac72ff0c"
+    _collector_id.collector_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd"
     return _collector_id
 
 def create_collector_request():
     _create_collector_request                                = telemetry_frontend_pb2.Collector()
-    _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4())
-    _create_collector_request.kpi_id.kpi_id.uuid             = str(uuid.uuid4())
-    _create_collector_request.duration_s                     = float(random.randint(8, 16))
-    _create_collector_request.interval_s                     = float(random.randint(2, 4))
+    # _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) 
+    _create_collector_request.collector_id.collector_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd"
+    # _create_collector_request.kpi_id.kpi_id.uuid             = str(uuid.uuid4())
+    _create_collector_request.kpi_id.kpi_id.uuid             = "6e22f180-ba28-4641-b190-2287bf448888"
+    # _create_collector_request.duration_s                     = float(random.randint(8, 16))
+    _create_collector_request.duration_s                     = -1
+    _create_collector_request.interval_s                     = float(random.randint(3, 5))
     return _create_collector_request
 
 def create_collector_filter():
diff --git a/src/telemetry/frontend/tests/test_frontend.py b/src/telemetry/frontend/tests/test_frontend.py
index 9c3f9d3a8f545792eb2bb3a371c6c20664d24f69..c3f8091c83f56fd4a134ec092b1e22723040595d 100644
--- a/src/telemetry/frontend/tests/test_frontend.py
+++ b/src/telemetry/frontend/tests/test_frontend.py
@@ -105,47 +105,10 @@ def test_SelectCollectors(telemetryFrontend_client):
     LOGGER.debug(str(response))
     assert isinstance(response, CollectorList)
 
-# ----- Non-gRPC method tests ----- 
-def test_RunResponseListener():
-    LOGGER.info(' >>> test_RunResponseListener START: <<< ')
-    TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl()
-    response = TelemetryFrontendServiceObj.RunResponseListener()     # becasue Method "run_kafka_listener" is not define in frontend.proto
-    LOGGER.debug(str(response))
-    assert isinstance(response, bool)
-
-# ------- previous test ----------------
-
-# def test_verify_db_and_table():
-#     LOGGER.info(' >>> test_verify_database_and_tables START: <<< ')
-#     _engine = TelemetryEngine.get_engine()
-#     managementDB.create_database(_engine)
-#     managementDB.create_tables(_engine)
-
-# def test_StartCollector(telemetryFrontend_client):
-#     LOGGER.info(' >>> test_StartCollector START: <<< ')
-#     response = telemetryFrontend_client.StartCollector(create_collector_request())
-#     LOGGER.debug(str(response))
-#     assert isinstance(response, CollectorId)
-
-# def test_run_kafka_listener():
-#     LOGGER.info(' >>> test_run_kafka_listener START: <<< ')
-#     name_mapping = NameMapping()
-#     TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl(name_mapping)
-#     response = TelemetryFrontendServiceObj.run_kafka_listener()     # Method "run_kafka_listener" is not define in frontend.proto
+# # ----- Non-gRPC method tests ----- 
+# def test_RunResponseListener():
+#     LOGGER.info(' >>> test_RunResponseListener START: <<< ')
+#     TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl()
+#     response = TelemetryFrontendServiceObj.RunResponseListener()     # becasue Method "run_kafka_listener" is not define in frontend.proto
 #     LOGGER.debug(str(response))
 #     assert isinstance(response, bool)
-
-# def test_StopCollector(telemetryFrontend_client):
-#     LOGGER.info(' >>> test_StopCollector START: <<< ')
-#     _collector_id = telemetryFrontend_client.StartCollector(create_collector_request())
-#     time.sleep(3)   # wait for small amount before call the stopCollecter()
-#     response = telemetryFrontend_client.StopCollector(_collector_id)
-#     LOGGER.debug(str(response))
-#     assert isinstance(response, Empty)
-
-# def test_select_collectors(telemetryFrontend_client):
-#     LOGGER.info(' >>> test_select_collector requesting <<< ')
-#     response = telemetryFrontend_client.SelectCollectors(create_collector_filter())
-#     LOGGER.info('Received Rows after applying Filter: {:} '.format(response))
-#     LOGGER.debug(str(response))
-#     assert isinstance(response, CollectorList)
\ No newline at end of file
diff --git a/src/telemetry/requirements.in b/src/telemetry/requirements.in
index a0e78d2bfb7270b9664ad5ba810e2f213d887bf7..503468a662599f0225b293d0ef4c4e4313fa3e0f 100644
--- a/src/telemetry/requirements.in
+++ b/src/telemetry/requirements.in
@@ -12,13 +12,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-anytree==2.8.0
 APScheduler==3.10.1
-influx-line-protocol==0.1.4
 psycopg2-binary==2.9.3
 python-dateutil==2.8.2
 python-json-logger==2.0.2
 pytz==2024.1
-questdb==1.0.1
 requests==2.27.1
-xmltodict==0.12.0
\ No newline at end of file
diff --git a/src/telemetry/tests/test_telemetryDB.py b/src/telemetry/tests/test_telemetryDB.py
index 1b122e4bca266018c01044e2eb8a1ab277b3e3c3..bbc02a2a22fbbae3a1064fc5f9606ec8b29ff0f9 100644
--- a/src/telemetry/tests/test_telemetryDB.py
+++ b/src/telemetry/tests/test_telemetryDB.py
@@ -15,12 +15,13 @@
 
 import logging
 from telemetry.database.Telemetry_DB import TelemetryDB
+from telemetry.database.TelemetryModel import Collector as CollectorModel
 
 LOGGER = logging.getLogger(__name__)
 
 def test_verify_databases_and_tables():
     LOGGER.info('>>> test_verify_databases_and_tables : START <<< ')
-    TelemetryDBobj = TelemetryDB()
+    TelemetryDBobj = TelemetryDB(CollectorModel)
     # TelemetryDBobj.drop_database()
     # TelemetryDBobj.verify_tables()
     TelemetryDBobj.create_database()