diff --git a/proto/kpi_sample_types.proto b/proto/kpi_sample_types.proto
index 0a9800d9e5839205e1e45f84e4c8bdafbe93f32f..d4efc084e5f1ea2376e71ef6a15bc9b972c5ac1d 100644
--- a/proto/kpi_sample_types.proto
+++ b/proto/kpi_sample_types.proto
@@ -39,4 +39,14 @@ enum KpiSampleType {
     KPISAMPLETYPE_L3_SECURITY_STATUS_CRYPTO     = 605;
 
     KPISAMPLETYPE_SERVICE_LATENCY_MS            = 701;
+
+// output KPIs
+    KPISAMPLETYPE_PACKETS_TRANSMITTED_AGG_OUTPUT           = 1101;
+    KPISAMPLETYPE_PACKETS_RECEIVED_AGG_OUTPUT              = 1102;
+    KPISAMPLETYPE_PACKETS_DROPPED_AGG_OUTPUT               = 1103;
+    KPISAMPLETYPE_BYTES_TRANSMITTED_AGG_OUTPUT             = 1201;
+    KPISAMPLETYPE_BYTES_RECEIVED_AGG_OUTPUT                = 1202;
+    KPISAMPLETYPE_BYTES_DROPPED_AGG_OUTPUT                 = 1203;
+
+    KPISAMPLETYPE_SERVICE_LATENCY_MS_AGG_OUTPUT            = 1701;
 }
diff --git a/scripts/run_tests_locally-analytics-backend.sh b/scripts/run_tests_locally-analytics-backend.sh
index 1c3386c62084bb42d6ffa2e1349b6f4286820a52..700155a42714bd05069c7c62db9ada09b4125355 100755
--- a/scripts/run_tests_locally-analytics-backend.sh
+++ b/scripts/run_tests_locally-analytics-backend.sh
@@ -18,8 +18,11 @@ PROJECTDIR=`pwd`
 
 cd $PROJECTDIR/src
 RCFILE=$PROJECTDIR/coverage/.coveragerc
+
 export KFK_SERVER_ADDRESS='127.0.0.1:9092'
+
 CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
 export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require"
-python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
+
+python3 -m pytest --log-level=DEBUG --log-cli-level=INFO --verbose \
     analytics/backend/tests/test_backend.py
diff --git a/scripts/run_tests_locally-analytics-frontend.sh b/scripts/run_tests_locally-analytics-frontend.sh
index 6e945406f0ff7b2670a35d5315d0ef428f701988..0cb4dc98da051ba27347be3534f070b80f51fd65 100755
--- a/scripts/run_tests_locally-analytics-frontend.sh
+++ b/scripts/run_tests_locally-analytics-frontend.sh
@@ -21,5 +21,5 @@ RCFILE=$PROJECTDIR/coverage/.coveragerc
 export KFK_SERVER_ADDRESS='127.0.0.1:9092'
 CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
 export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_analytics?sslmode=require"
-python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
+python3 -m pytest --log-level=DEBUG --log-cli-level=INFO --verbose \
     analytics/frontend/tests/test_frontend.py
diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py
index f3a58feaab8667b266052803dddd1641b8a690f3..92332df6fb5a5c5d3c96d7210deaf1a98a0203b1 100755
--- a/src/analytics/backend/service/AnalyticsBackendService.py
+++ b/src/analytics/backend/service/AnalyticsBackendService.py
@@ -16,32 +16,46 @@ import time
 import json
 import logging
 import threading
+
 from common.tools.service.GenericGrpcService import GenericGrpcService
 from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
-from confluent_kafka import Consumer as KafkaConsumer
+from confluent_kafka import Consumer
 from confluent_kafka import KafkaError
 from common.Constants import ServiceNameEnum
 from common.Settings import get_service_port_grpc
-from threading import Thread, Event
-from .DaskStreaming import DaskStreamer
+from analytics.backend.service.Streamer import DaskStreamer
+from analytics.backend.service.AnalyzerHelper import AnalyzerHelper
+
 
 LOGGER = logging.getLogger(__name__)
 
 class AnalyticsBackendService(GenericGrpcService):
     """
-    Class listens for ...
+    AnalyticsBackendService class is responsible for handling the requests from the AnalyticsFrontendService.
+    It listens to the Kafka topic for the requests and starts/stops the DaskStreamer accordingly.
+    It also initializes the Kafka producer and Dask cluster for the streamer.
     """
-    def __init__(self, cls_name : str = __name__) -> None:
+    def __init__(self, cls_name : str = __name__, n_workers=1, threads_per_worker=1
+                 ) -> None:
         LOGGER.info('Init AnalyticsBackendService')
         port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND)
         super().__init__(port, cls_name=cls_name)
-        self.running_threads = {}       # To keep track of all running analyzers 
-        self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
-                                            'group.id'           : 'analytics-frontend',
-                                            'auto.offset.reset'  : 'latest'})
+        self.active_streamers = {}
+        self.central_producer = AnalyzerHelper.initialize_kafka_producer()  # Multi-threaded producer
+        self.cluster          = AnalyzerHelper.initialize_dask_cluster(
+                                        n_workers, threads_per_worker) # Local cluster
+        self.request_consumer = Consumer({
+            'bootstrap.servers' : KafkaConfig.get_kafka_address(),
+            'group.id'          : 'analytics-backend',
+            'auto.offset.reset' : 'latest',
+            })
+
 
     def install_servicers(self):
-        threading.Thread(target=self.RequestListener, args=()).start()
+        threading.Thread(
+            target=self.RequestListener,
+            args=()
+        ).start()
 
     def RequestListener(self):
         """
@@ -49,7 +63,7 @@ class AnalyticsBackendService(GenericGrpcService):
         """
         LOGGER.info("Request Listener is initiated ...")
         # print      ("Request Listener is initiated ...")
-        consumer = self.kafka_consumer
+        consumer = self.request_consumer
         consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
         while True:
             receive_msg = consumer.poll(2.0)
@@ -60,65 +74,96 @@ class AnalyticsBackendService(GenericGrpcService):
                     continue
                 else:
                     LOGGER.error("Consumer error: {:}".format(receive_msg.error()))
-                    # print       ("Consumer error: {:}".format(receive_msg.error()))
                     break
             try:
                 analyzer      = json.loads(receive_msg.value().decode('utf-8'))
                 analyzer_uuid = receive_msg.key().decode('utf-8')
-                LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
-                # print       ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
+                LOGGER.info('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
 
                 if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
-                    self.StopDaskListener(analyzer_uuid)
+                    if self.StopStreamer(analyzer_uuid):
+                        LOGGER.info("Dask Streamer stopped.")
+                    else:
+                        LOGGER.error("Failed to stop Dask Streamer.")
                 else:
-                    self.StartDaskListener(analyzer_uuid, analyzer)
+                    if self.StartStreamer(analyzer_uuid, analyzer):
+                        LOGGER.info("Dask Streamer started.")
+                    else:
+                        LOGGER.error("Failed to start Dask Streamer.")
             except Exception as e:
                 LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
-                # print         ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
-
-    def StartDaskListener(self, analyzer_uuid, analyzer):
-        kpi_list      = analyzer[ 'input_kpis'   ] 
-        thresholds    = analyzer[ 'thresholds'   ]
-        window_size   = analyzer[ 'window_size'  ]
-        window_slider = analyzer[ 'window_slider']
-
-        LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:}".format(
-            kpi_list, thresholds, window_size, window_slider))
-        # print        ("Received parameters: {:} - {:} - {:} - {:}".format(
-        #     kpi_list, thresholds, window_size, window_slider))
+
+
+    def StartStreamer(self, analyzer_uuid : str, analyzer : dict):
+        """
+        Start the DaskStreamer with the given parameters.
+        """
+        if analyzer_uuid in self.active_streamers:
+            LOGGER.warning("Dask Streamer already running with the given analyzer_uuid: {:}".format(analyzer_uuid))
+            return False
         try:
-            stop_event = Event()
-            thread     = Thread(
-                target=DaskStreamer,
-                # args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event),
-                args=(analyzer['output_kpis'][0] , kpi_list, thresholds, stop_event),
-                kwargs={
-                    "window_size"       : window_size,
-                }
+            streamer = DaskStreamer(
+                key               = analyzer_uuid,
+                input_kpis        = analyzer['input_kpis' ],
+                output_kpis       = analyzer['output_kpis'],
+                thresholds        = analyzer['thresholds' ],
+                batch_size        = analyzer['batch_size' ],
+                window_size       = analyzer['window_size'],
+                cluster_instance  = self.cluster,
+                producer_instance = self.central_producer,
             )
-            thread.start()
-            self.running_threads[analyzer_uuid] = (thread, stop_event)
-            # print      ("Initiated Analyzer backend: {:}".format(analyzer_uuid))
-            LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid))
+            streamer.start()
+            LOGGER.info(f"Streamer started with analyzer Id: {analyzer_uuid}")
+
+            # Stop the streamer after the given duration
+            if analyzer['duration'] > 0:
+                def stop_after_duration():
+                    time.sleep(analyzer['duration'])
+                    LOGGER.warning(f"Execution duration completed of Analyzer: {analyzer_uuid}")
+                    if not self.StopStreamer(analyzer_uuid):
+                        LOGGER.warning("Failed to stop Dask Streamer. Streamer may be already terminated.")
+
+                duration_thread = threading.Thread(target=stop_after_duration, daemon=True)
+                duration_thread.start()
+
+            self.active_streamers[analyzer_uuid] = streamer
+            return True
+        except Exception as e:
+            LOGGER.error("Failed to start Dask Streamer. ERROR: {:}".format(e))
+            return False
+
+    def StopStreamer(self, analyzer_uuid : str):
+        """
+        Stop the DaskStreamer with the given analyzer_uuid.
+        """
+        try:
+            if analyzer_uuid not in self.active_streamers:
+                LOGGER.warning("Dask Streamer not found with the given analyzer_uuid: {:}".format(analyzer_uuid))
+                return False
+            LOGGER.info(f"Terminating streamer with Analyzer Id: {analyzer_uuid}")
+            streamer = self.active_streamers[analyzer_uuid]
+            streamer.stop()
+            streamer.join()
+            del self.active_streamers[analyzer_uuid]
+            LOGGER.info(f"Streamer with analyzer_uuid '{analyzer_uuid}' has been trerminated sucessfully.")
             return True
         except Exception as e:
-            # print       ("Failed to initiate Analyzer backend: {:}".format(e))
-            LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
+            LOGGER.error("Failed to stop Dask Streamer. ERROR: {:}".format(e))
             return False
 
-    def StopDaskListener(self, analyzer_uuid):
-        if analyzer_uuid in self.running_threads:
+    def close(self):        # TODO: Is this function needed?
+        """
+        Close the producer and cluster cleanly.
+        """
+        if self.central_producer:
             try:
-                thread, stop_event = self.running_threads[analyzer_uuid]
-                stop_event.set()
-                thread.join()
-                del self.running_threads[analyzer_uuid]
-                # print      ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
-                LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
-                return True
+                self.central_producer.flush()
+                LOGGER.info("Kafka producer flushed and closed.")
             except Exception as e:
-                LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: {:}".format(analyzer_uuid, e))
-                return False
-        else:
-            # print         ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid))
-            LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))
+                LOGGER.error(f"Error closing Kafka producer: {e}")
+        if self.cluster:
+            try:
+                self.cluster.close()
+                LOGGER.info("Dask cluster closed.")
+            except Exception as e:
+                LOGGER.error(f"Error closing Dask cluster: {e}")
diff --git a/src/analytics/backend/service/AnalyzerHandlers.py b/src/analytics/backend/service/AnalyzerHandlers.py
new file mode 100644
index 0000000000000000000000000000000000000000..a05b1a0b772d1d6910e7487c8260fba894aba7e0
--- /dev/null
+++ b/src/analytics/backend/service/AnalyzerHandlers.py
@@ -0,0 +1,131 @@
+# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from enum import Enum
+import pandas as pd
+
+logger = logging.getLogger(__name__)
+
+
+class Handlers(Enum):
+    AGGREGATION_HANDLER = "AggregationHandler"
+    UNSUPPORTED_HANDLER = "UnsupportedHandler"
+
+    @classmethod
+    def is_valid_handler(cls, handler_name):
+        return handler_name in cls._value2member_map_
+
+# This method is top-level and should not be part of the class due to serialization issues.
+def threshold_handler(key, aggregated_df, thresholds):
+    """
+    Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary
+    on the aggregated DataFrame.
+
+    Args:
+        key (str): Key for the aggregated DataFrame.
+        aggregated_df (pd.DataFrame): DataFrame with aggregated metrics.
+        thresholds (dict): Thresholds dictionary with keys in the format '<metricName>' and values as (fail_th, raise_th).
+
+    Returns:
+        pd.DataFrame: DataFrame with additional threshold columns.
+    """
+    for metric_name, threshold_values in thresholds.items():
+        # Ensure the metric column exists in the DataFrame
+        if metric_name not in aggregated_df.columns:
+            logger.warning(f"Metric '{metric_name}' does not exist in the DataFrame for key: {key}. Skipping threshold application.")
+            continue
+        
+        # Ensure the threshold values are valid (check for tuple specifically)
+        if isinstance(threshold_values, list) and len(threshold_values) == 2:
+            fail_th, raise_th = threshold_values
+            
+            # Add threshold columns with updated naming
+            aggregated_df[f"{metric_name}_TH_RAISE"] = aggregated_df[metric_name] > raise_th
+            aggregated_df[f"{metric_name}_TH_FALL"]  = aggregated_df[metric_name] < fail_th
+        else:
+            logger.warning(f"Threshold values for '{metric_name}' ({threshold_values}) are not a list of length 2. Skipping threshold application.")
+    return aggregated_df
+
+def aggregation_handler(
+        batch_type_name, key, batch, input_kpi_list, output_kpi_list, thresholds
+    ):
+    """
+      Process a batch of data and calculate aggregated values for each input KPI
+      and maps them to the output KPIs. """
+
+    logger.info(f"({batch_type_name}) Processing batch for key: {key}")
+    if not batch:
+        logger.info("Empty batch received. Skipping processing.")
+        return []
+    else:
+        logger.info(f" >>>>> Processing {len(batch)} records for key: {key}")
+        
+        # Convert data into a DataFrame
+        df = pd.DataFrame(batch)
+
+        # Filter the DataFrame to retain rows where kpi_id is in the input list (subscribed endpoints only)
+        df = df[df['kpi_id'].isin(input_kpi_list)].copy()
+
+        if df.empty:
+            logger.warning(f"No data available for KPIs: {input_kpi_list}. Skipping processing.")
+            return []
+
+        # Define all possible aggregation methods
+        aggregation_methods = {
+            "min"     : ('kpi_value', 'min'),
+            "max"     : ('kpi_value', 'max'),
+            "avg"     : ('kpi_value', 'mean'),
+            "first"   : ('kpi_value', lambda x: x.iloc[0]),
+            "last"    : ('kpi_value', lambda x: x.iloc[-1]),
+            "variance": ('kpi_value', 'var'),
+            "count"   : ('kpi_value', 'count'),
+            "range"   : ('kpi_value', lambda x: x.max() - x.min()),
+            "sum"     : ('kpi_value', 'sum'),
+        }
+
+        # Process each KPI-specific task parameter
+        for kpi_index, kpi_id in enumerate(input_kpi_list):
+
+            # logger.info(f"1.Processing KPI: {kpi_id}")
+            kpi_task_parameters = thresholds["task_parameter"][kpi_index]
+            
+            # Get valid task parameters for this KPI
+            valid_task_parameters = [
+                method for method in kpi_task_parameters.keys() 
+                if method in aggregation_methods
+            ]
+
+            # Select the aggregation methods based on valid task parameters
+            selected_methods = {method: aggregation_methods[method] for method in valid_task_parameters}
+
+            # logger.info(f"2. Processing KPI: {kpi_id} with task parameters: {kpi_task_parameters}")
+            kpi_df = df[df['kpi_id'] == kpi_id]
+
+            # Check if kpi_df is not empty before applying the aggregation methods
+            if not kpi_df.empty:
+                agg_df = kpi_df.groupby('kpi_id').agg(**selected_methods).reset_index()
+
+                # logger.info(f"3. Aggregated DataFrame for KPI: {kpi_id}: {agg_df}")
+
+                agg_df['kpi_id'] = output_kpi_list[kpi_index]
+
+                # logger.info(f"4. Applying thresholds for df: {agg_df['kpi_id']}")
+                result = threshold_handler(key, agg_df, kpi_task_parameters)
+
+                return result.to_dict(orient='records')
+            else:
+                logger.warning(f"No data available for KPIs: {kpi_id}. Skipping aggregation.")
+                continue
+        return []
diff --git a/src/analytics/backend/service/AnalyzerHelper.py b/src/analytics/backend/service/AnalyzerHelper.py
new file mode 100644
index 0000000000000000000000000000000000000000..15a45aee68341c599905983efd79737a9d4929ab
--- /dev/null
+++ b/src/analytics/backend/service/AnalyzerHelper.py
@@ -0,0 +1,67 @@
+# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from dask.distributed import Client, LocalCluster
+from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
+from confluent_kafka import Consumer, Producer
+
+import logging
+logger = logging.getLogger(__name__)
+
+
+class AnalyzerHelper:
+    def __init__(self):
+        pass
+
+    @staticmethod
+    def initialize_dask_client(cluster_instance):
+        """Initialize a local Dask client."""
+        if cluster_instance is None:
+            logger.error("Dask Cluster is not initialized. Exiting.")
+            return None
+        client = Client(cluster_instance)
+        logger.info(f"Dask Client Initialized: {client}")
+        return client
+
+    @staticmethod
+    def initialize_dask_cluster(n_workers=1, threads_per_worker=2):
+        """Initialize a local Dask cluster"""
+        cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker)
+        logger.info(f"Dask Cluster Initialized: {cluster}")
+        return cluster
+
+    @staticmethod
+    def initialize_kafka_consumer():    # TODO: update to receive topic and group_id as parameters
+        """Initialize the Kafka consumer."""
+        consumer_conf = {
+            'bootstrap.servers': KafkaConfig.get_kafka_address(),
+            'group.id': 'analytics-backend',
+            'auto.offset.reset': 'latest'
+        }
+        consumer = Consumer(consumer_conf)
+        consumer.subscribe([KafkaTopic.VALUE.value])
+        return consumer
+
+    @staticmethod
+    def initialize_kafka_producer():
+        """Initialize the Kafka producer."""
+        return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()})
+
+    @staticmethod
+    def delivery_report(err, msg):
+        if err is not None:
+            logger.error(f"Message delivery failed: {err}")
+        else:
+            logger.debug(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
diff --git a/src/analytics/backend/service/DaskStreaming.py b/src/analytics/backend/service/DaskStreaming.py
deleted file mode 100644
index 79dee7ef972a8b5546e3b38289c4fdb5b4bcc0d1..0000000000000000000000000000000000000000
--- a/src/analytics/backend/service/DaskStreaming.py
+++ /dev/null
@@ -1,268 +0,0 @@
-# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import time
-import json
-from confluent_kafka import Consumer, Producer, KafkaException, KafkaError
-import pandas as pd
-from dask.distributed import Client, LocalCluster
-from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
-
-logging.basicConfig(level=logging.INFO)
-LOGGER = logging.getLogger(__name__)
-
-def SettingKafkaConsumerParams():
-    return {'bootstrap.servers'  : KafkaConfig.get_kafka_address(),
-            'group.id'           : 'analytics-backend',
-            'auto.offset.reset'  : 'latest'}
-
-def GetAggregationMappings(thresholds):
-    agg_dict = {}
-    for threshold_key in thresholds.keys():
-        parts = threshold_key.split('_', 1)
-        if len(parts) != 2:
-            LOGGER.warning(f"Threshold key '{threshold_key}' does not follow the '<aggregation>_<metricName>' format. Skipping.")
-            continue
-        aggregation, metric_name = parts
-        # Ensure that the aggregation function is valid in pandas
-        if aggregation not in ['mean', 'min', 'max', 'first', 'last', 'std']:
-            LOGGER.warning(f"Unsupported aggregation '{aggregation}' in threshold key '{threshold_key}'. Skipping.")
-            continue
-        agg_dict[threshold_key] = ('kpi_value', aggregation)
-    return agg_dict
-
-
-def ApplyThresholds(aggregated_df, thresholds):
-    """
-    Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary
-    on the aggregated DataFrame.
-    Args:       aggregated_df (pd.DataFrame): DataFrame with aggregated metrics.
-                thresholds (dict): Thresholds dictionary with keys in the format '<aggregation>_<metricName>'.
-    Returns:    pd.DataFrame: DataFrame with additional threshold columns.
-    """
-    for threshold_key, threshold_values in thresholds.items():
-        if threshold_key not in aggregated_df.columns:
-
-            LOGGER.warning(f"Threshold key '{threshold_key}' does not correspond to any aggregation result. Skipping threshold application.")
-            continue
-        if isinstance(threshold_values, (list, tuple)) and len(threshold_values) == 2:
-            fail_th, raise_th = threshold_values
-            aggregated_df["THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th
-            aggregated_df["THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th
-            aggregated_df["value"] = aggregated_df[threshold_key]
-        else:
-            LOGGER.warning(f"Threshold values for '{threshold_key}' are not a list or tuple of length 2. Skipping threshold application.")
-    return aggregated_df
-
-def initialize_dask_client():
-    """
-    Initialize a local Dask cluster and client.
-    """
-    cluster = LocalCluster(n_workers=2, threads_per_worker=2)
-    client = Client(cluster)
-    LOGGER.info(f"Dask Client Initialized: {client}")
-    return client, cluster
-
-def initialize_kafka_producer():
-    return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()})
-
-def delivery_report(err, msg):
-    if err is not None:
-        LOGGER.error(f"Message delivery failed: {err}")
-    else:
-        LOGGER.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
-
-def process_batch(batch, agg_mappings, thresholds, key):
-    """
-    Process a batch of data and apply thresholds.
-    Args:       batch (list of dict): List of messages from Kafka.
-                agg_mappings (dict): Mapping from threshold key to aggregation function.
-                thresholds (dict): Thresholds dictionary.
-    Returns:    list of dict: Processed records ready to be sent to Kafka.
-    """
-    if not batch:
-        LOGGER.info("Empty batch received. Skipping processing.")
-        return []
-
-
-    df = pd.DataFrame(batch)
-    LOGGER.info(f"df {df} ")
-    df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce')
-    df.dropna(subset=['time_stamp'], inplace=True)
-    LOGGER.info(f"df {df} ")
-    required_columns = {'time_stamp', 'kpi_id', 'kpi_value'}
-    if not required_columns.issubset(df.columns):
-        LOGGER.warning(f"Batch contains missing required columns. Required columns: {required_columns}. Skipping batch.")
-        return []
-    if df.empty:
-        LOGGER.info("No data after filtering by KPI IDs. Skipping processing.")
-        return []
-
-    # Perform aggregations using named aggregation
-    try:
-        agg_dict = {key: value for key, value in agg_mappings.items()}
-
-        df_agg_ = df.groupby(['window_start']).agg(**agg_dict).reset_index()
-
-        #example: agg_dict = {'min_latency_E2E': ('kpi_value', 'min')
-
-        #given that threshold has 1 value
-        second_value_tuple = next(iter(agg_dict.values()))[1]
-        #in case we have multiple thresholds!
-        #second_values_tuples = [value[1] for value in agg_dict.values()]
-        if second_value_tuple=="min":
-            df_agg = df_agg_.min(numeric_only=True).to_frame().T
-        elif second_value_tuple == "max":
-            df_agg = df_agg_.max(numeric_only=True).to_frame().T
-        elif second_value_tuple == "std":
-            df_agg = df_agg_.sted(numeric_only=True).to_frame().T
-        else:
-            df_agg = df_agg_.mean(numeric_only=True).to_frame().T
-
-        # Assign the first value of window_start from the original aggregated data
-        df_agg['window_start'] = df_agg_['window_start'].iloc[0]
-
-        # Reorder columns to place 'window_start' first if needed
-        cols = ['window_start'] + [col for col in df_agg.columns if col != 'window_start']
-        df_agg = df_agg[cols]
-
-    except Exception as e:
-        LOGGER.error(f"Aggregation error: {e}")
-        return []
-
-    # Apply thresholds
-
-
-    df_thresholded = ApplyThresholds(df_agg, thresholds)
-    df_thresholded['kpi_id'] = key
-    df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
-    # Convert aggregated DataFrame to list of dicts
-    result = df_thresholded.to_dict(orient='records')
-    LOGGER.info(f"Processed batch with {len(result)} records after aggregation and thresholding.")
-    return result
-
-def produce_result(result, producer, destination_topic):
-    for record in result:
-        try:
-            producer.produce(
-                destination_topic,
-                key=str(record.get('kpi_id', '')),
-                value=json.dumps(record),
-                callback=delivery_report
-            )
-        except KafkaException as e:
-            LOGGER.error(f"Failed to produce message: {e}")
-    producer.flush()
-    LOGGER.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.")
-
-def DaskStreamer(key, kpi_list, thresholds, stop_event,
-                window_size="30s", time_stamp_col="time_stamp"):
-    client, cluster = initialize_dask_client()
-    consumer_conf   = SettingKafkaConsumerParams()
-    consumer        = Consumer(consumer_conf)
-    consumer.subscribe([KafkaTopic.VALUE.value])
-    producer        = initialize_kafka_producer()
-
-    # Parse window_size to seconds
-    try:
-        window_size_td = pd.to_timedelta(window_size)
-        window_size_seconds = window_size_td.total_seconds()
-    except Exception as e:
-        LOGGER.error(f"Invalid window_size format: {window_size}. Error: {e}")
-        window_size_seconds = 30 
-    LOGGER.info(f"Batch processing interval set to {window_size_seconds} seconds.")
-
-    # Extract aggregation mappings from thresholds
-    agg_mappings = GetAggregationMappings(thresholds)
-    if not agg_mappings:
-        LOGGER.error("No valid aggregation mappings extracted from thresholds. Exiting streamer.")
-        consumer.close()
-        producer.flush()
-        client.close()
-        cluster.close()
-        return
-    try:
-        batch = []
-        last_batch_time = time.time()
-        LOGGER.info("Starting to consume messages...")
-
-        while not stop_event.is_set():
-            msg = consumer.poll(1.0)
-
-            if msg is None:
-                current_time = time.time()
-                if (current_time - last_batch_time) >= window_size_seconds and batch:
-                    LOGGER.info("Time-based batch threshold reached. Processing batch.")
-                    future = client.submit(process_batch, batch, agg_mappings, thresholds)
-                    future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
-                    batch = []
-                    last_batch_time = current_time
-                continue
-
-            if msg.error():
-                if msg.error().code() == KafkaError._PARTITION_EOF:
-                    LOGGER.warning(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
-                else:
-                    LOGGER.error(f"Kafka error: {msg.error()}")
-                continue
-
-            try:
-                message_value = json.loads(msg.value().decode('utf-8'))
-            except json.JSONDecodeError as e:
-                LOGGER.error(f"JSON decode error: {e}")
-                continue
-
-            try:
-                message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce')
-                LOGGER.warning(f"message_timestamp: {message_timestamp}. Skipping message.")
-
-                if pd.isna(message_timestamp):
-                    LOGGER.warning(f"Invalid timestamp in message: {message_value}. Skipping message.")
-                    continue
-                window_start = message_timestamp.floor(window_size)
-                LOGGER.warning(f"window_start: {window_start}. Skipping message.")
-                message_value['window_start'] = window_start
-            except Exception as e:
-                LOGGER.error(f"Error processing timestamp: {e}. Skipping message.")
-                continue
-
-            if message_value['kpi_id'] not in kpi_list:
-                LOGGER.debug(f"KPI ID '{message_value['kpi_id']}' not in kpi_list. Skipping message.")
-                continue
-
-            batch.append(message_value)
-
-            current_time = time.time()
-            if (current_time - last_batch_time) >= window_size_seconds and batch:
-                LOGGER.info("Time-based batch threshold reached. Processing batch.")
-                future = client.submit(process_batch, batch, agg_mappings, thresholds, key)
-                future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
-                batch = []
-                last_batch_time = current_time
-
-    except Exception as e:
-        LOGGER.exception(f"Error in Dask streaming process: {e}")
-    finally:
-        # Process any remaining messages in the batch
-        if batch:
-            LOGGER.info("Processing remaining messages in the batch.")
-            future = client.submit(process_batch, batch, agg_mappings, thresholds)
-            future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value))
-        consumer.close()
-        producer.flush()
-        LOGGER.info("Kafka consumer and producer closed.")
-        client.close()
-        cluster.close()
-        LOGGER.info("Dask client and cluster closed.")
diff --git a/src/analytics/backend/service/Streamer.py b/src/analytics/backend/service/Streamer.py
new file mode 100644
index 0000000000000000000000000000000000000000..c72359db2e5de7f9165442deb9d52bfde415b289
--- /dev/null
+++ b/src/analytics/backend/service/Streamer.py
@@ -0,0 +1,162 @@
+# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import json
+import threading
+import logging
+
+from confluent_kafka                            import KafkaException, KafkaError
+from common.tools.kafka.Variables               import KafkaTopic
+from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler
+from analytics.backend.service.AnalyzerHelper   import AnalyzerHelper
+
+
+logger = logging.getLogger(__name__)
+
+
+class DaskStreamer(threading.Thread):
+    def __init__(self, key, input_kpis, output_kpis, thresholds, 
+                 batch_size        = 5, 
+                 window_size       = None,
+                 cluster_instance  = None,
+                 producer_instance = AnalyzerHelper.initialize_kafka_producer()
+                 ):
+        super().__init__()
+        self.key         = key
+        self.input_kpis  = input_kpis
+        self.output_kpis = output_kpis
+        self.thresholds  = thresholds
+        self.window_size = window_size
+        self.batch_size  = batch_size
+        self.running     = True
+        self.batch       = []
+
+        # Initialize Kafka and Dask components
+        self.client   = AnalyzerHelper.initialize_dask_client(cluster_instance)
+        self.consumer = AnalyzerHelper.initialize_kafka_consumer()      # Single-threaded consumer
+        self.producer = producer_instance
+
+        logger.info("Dask Streamer initialized.")
+
+    def run(self):
+        """Main method to start the DaskStreamer."""
+        try:
+            logger.info("Starting Dask Streamer")
+            last_batch_time = time.time()
+            while True:
+                if not self.consumer:
+                    logger.warning("Kafka consumer is not initialized or stopped. Exiting loop.")
+                    break
+                if not self.running:
+                    logger.warning("Dask Streamer instance has been terminated. Exiting loop.")
+                    break
+                if not self.client:
+                    logger.warning("Dask client is not running. Exiting loop.")
+                    break
+                message = self.consumer.poll(timeout=2.0)
+                if message is None:
+                    # logger.info("No new messages received.")
+                    continue
+                if message.error():
+                    if message.error().code() == KafkaError._PARTITION_EOF:
+                        logger.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}")
+                    elif message.error():
+                        raise KafkaException(message.error())
+                else:
+                    try:
+                        value = json.loads(message.value())
+                    except json.JSONDecodeError:
+                        logger.error(f"Failed to decode message: {message.value()}")
+                        continue
+                    self.batch.append(value)
+
+                # Window size has a precedence over batch size
+                if self.window_size is None:
+                    if len(self.batch) >= self.batch_size:  # If batch size is not provided, process continue with the default batch size
+                        logger.info(f"Processing based on batch size {self.batch_size}.")
+                        self.task_handler_selector()
+                        self.batch = []
+                else:
+                    # Process based on window size
+                    current_time = time.time()
+                    if (current_time - last_batch_time) >= self.window_size and self.batch:
+                        logger.info(f"Processing based on window size {self.window_size}.")
+                        self.task_handler_selector()
+                        self.batch = []
+                        last_batch_time = current_time
+
+        except Exception as e:
+            logger.exception(f"Error in Dask streaming process: {e}")
+        finally:
+            self.stop()
+            logger.info(">>> Exiting Dask Streamer...")
+
+    def task_handler_selector(self):
+        """Select the task handler based on the task type."""
+        logger.info(f"Batch to be processed: {self.batch}")
+        if Handlers.is_valid_handler(self.thresholds["task_type"]):
+            if self.client is not None and self.client.status == 'running':
+                try:
+                    future = self.client.submit(aggregation_handler, "batch size", self.key,
+                                                    self.batch, self.input_kpis, self.output_kpis, self.thresholds)
+                    future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value))
+                except Exception as e:
+                    logger.error(f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}")
+            else:
+                logger.warning("Dask client is not running. Skipping processing.")
+        else:
+            logger.warning(f"Unknown task type: {self.thresholds['task_type']}. Skipping processing.")
+
+    def produce_result(self, result, destination_topic):
+        """Produce results to the Kafka topic."""
+        if not result:
+            logger.warning("Nothing to produce. Skipping.")
+            return
+        for record in result:
+            try:
+                self.producer.produce(
+                    destination_topic,
+                    key=str(record.get('kpi_id', '')),
+                    value=json.dumps(record),
+                    callback=AnalyzerHelper.delivery_report
+                )
+            except KafkaException as e:
+                logger.error(f"Failed to produce message: {e}")
+        self.producer.flush()
+        logger.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.")
+
+    def stop(self):
+        """Clean up Kafka and Dask thread resources."""
+        if not self.running:
+            logger.info("Dask Streamer is already stopped.")
+            return
+        self.running = False
+        logger.info("Streamer running status is set to False. Waiting 5 seconds before stopping...")
+        time.sleep(5)       # Waiting time for running tasks to complete
+        if self.consumer:
+            try:
+                self.consumer.close()
+                logger.info("Kafka consumer closed.")
+            except Exception as e:
+                logger.error(f"Error closing Kafka consumer: {e}")
+
+        if self.client is not None and hasattr(self.client, 'status') and self.client.status == 'running':
+            try:
+                self.client.close()
+                logger.info("Dask client closed.")
+            except Exception as e:
+                logger.error(f"Error closing Dask client: {e}")
+
+# TODO: May be Single streamer for all analyzers ... ?
diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py
index 55d966dfbbae2318a5f774049b02f5b340070113..2ff1e93536e94863ba9eaaf76d35f7453ba95727 100644
--- a/src/analytics/backend/tests/messages.py
+++ b/src/analytics/backend/tests/messages.py
@@ -53,8 +53,8 @@ def create_analyzer():
     _create_analyzer.algorithm_name               = "Test_Aggergate_and_Threshold"
     _create_analyzer.operation_mode               = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
     
-    _kpi_id = KpiId()
     # input IDs to analyze
+    _kpi_id = KpiId()
     _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
     _kpi_id.kpi_id.uuid              = "5716c369-932b-4a02-b4c7-6a2e808b92d7"
     _create_analyzer.input_kpi_ids.append(_kpi_id)
@@ -63,11 +63,14 @@ def create_analyzer():
     _create_analyzer.input_kpi_ids.append(_kpi_id)
     _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
     _create_analyzer.input_kpi_ids.append(_kpi_id)
+
     # output IDs after analysis
+    _kpi_id = KpiId()
     _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
     _create_analyzer.output_kpi_ids.append(_kpi_id)
     _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
     _create_analyzer.output_kpi_ids.append(_kpi_id)
+    
     # parameter
     _threshold_dict = {
         # 'avg_value'   :(20, 30), 'min_value'   :(00, 10), 'max_value'   :(45, 50),
diff --git a/src/analytics/backend/tests/messages_analyzer.py b/src/analytics/backend/tests/messages_analyzer.py
new file mode 100644
index 0000000000000000000000000000000000000000..4a119d948864d74fc650983e96406fedd3b73e6a
--- /dev/null
+++ b/src/analytics/backend/tests/messages_analyzer.py
@@ -0,0 +1,64 @@
+# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pandas as pd
+from analytics.backend.service.AnalyzerHandlers import Handlers
+
+def get_input_kpi_list():
+    return ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3']
+
+def get_output_kpi_list():
+    return ["1e22f180-ba28-4641-b190-2287bf441616", "6e22f180-ba28-4641-b190-2287bf181818", 'kpi_4']
+
+def get_thresholds():
+    return {
+        "task_type": Handlers.AGGREGATION_HANDLER.value,
+        "task_parameter": [
+            {"last":  [40, 80], "variance": [300, 500]},
+            {"count": [2,  4],  "max":      [70,  100]},
+            {"min":   [10, 20], "avg":      [50,  70]},
+        ],
+    }
+
+def get_duration():
+    return 40
+
+def get_windows_size():
+    return None
+
+def get_batch_size():
+    return 10
+
+def get_interval():
+    return 5
+
+def get_batch():
+    return [
+        {"time_stamp": "2025-01-13T08:44:10Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 46.72},
+        {"time_stamp": "2025-01-13T08:44:12Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 65.22},
+        {"time_stamp": "2025-01-13T08:44:14Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 54.24},
+        {"time_stamp": "2025-01-13T08:44:16Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 57.67},
+        {"time_stamp": "2025-01-13T08:44:18Z", "kpi_id": "1e22f180-ba28-4641-b190-2287bf446666", "kpi_value": 38.6},
+        {"time_stamp": "2025-01-13T08:44:20Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 38.9},
+        {"time_stamp": "2025-01-13T08:44:22Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 52.44},
+        {"time_stamp": "2025-01-13T08:44:24Z", "kpi_id": "6e22f180-ba28-4641-b190-2287bf448888", "kpi_value": 47.76},
+        {"time_stamp": "2025-01-13T08:44:26Z", "kpi_id": "efef4d95-1cf1-43c4-9742-95c283ddd7a6", "kpi_value": 33.71},
+        {"time_stamp": "2025-01-13T08:44:28Z", "kpi_id": "efef4d95-1cf1-43c4-9742-95c283ddd7a6", "kpi_value": 64.44},
+    ]
+
+def get_agg_df():
+    data = [ 
+        {"kpi_id": "1e22f180-ba28-4641-b190-2287bf441616", "last": 47.76, "variance": 970.41},
+    ]
+    return pd.DataFrame(data)
diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py
index 4aa9df5fae9849ee429361603a35b2fb8eaa4d23..91d53000d408981896607a0b4af974100a916d15 100644
--- a/src/analytics/backend/tests/test_backend.py
+++ b/src/analytics/backend/tests/test_backend.py
@@ -12,148 +12,290 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import time, json
-from typing import Dict
+import time
+import json
+import pytest
 import logging
-from threading import Event, Thread
-from common.tools.kafka.Variables import KafkaTopic
+import pandas as pd
+
+from unittest.mock      import MagicMock, patch
+from .messages_analyzer import get_batch, get_input_kpi_list, get_output_kpi_list, get_thresholds, \
+                               get_windows_size, get_batch_size, get_agg_df, get_duration
+
+from common.tools.kafka.Variables                      import KafkaTopic
+from analytics.backend.service.Streamer                import DaskStreamer
+from analytics.backend.service.AnalyzerHandlers        import aggregation_handler, threshold_handler
 from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
-from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict
-from .messages import create_analyzer, create_analyzer_dask
-from threading import Thread, Event
-from ..service.DaskStreaming import DaskStreamer
 
-LOGGER = logging.getLogger(__name__)
 
+logger = logging.getLogger(__name__)
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s -  %(levelname)s - %(message)s')
+
+
+# ----
+# Test fixtures and helper functions
+# ----
+
+@pytest.fixture(autouse=True)
+def log_all_methods(request):
+    '''
+    This fixture logs messages before and after each test function runs, indicating the start and end of the test.
+    The autouse=True parameter ensures that this logging happens automatically for all tests in the module.
+    '''
+    logger.info(f" >>>>> Starting test: {request.node.name} ")
+    yield
+    logger.info(f" <<<<< Finished test: {request.node.name} ")
+
+@pytest.fixture
+def mock_kafka_producer():
+    mock_producer         = MagicMock()
+    mock_producer.produce = MagicMock()
+    mock_producer.flush   = MagicMock()
+    return mock_producer
+
+@pytest.fixture
+def mock_dask_cluster():
+    mock_cluster       = MagicMock()
+    mock_cluster.close = MagicMock()
+    return mock_cluster
+
+@pytest.fixture
+def mock_dask_client():
+    mock_client        = MagicMock()
+    mock_client.status = 'running'
+    mock_client.submit = MagicMock()
+    return mock_client
+
+@pytest.fixture()
+def mock_kafka_consumer():
+    mock_consumer           = MagicMock()
+    mock_consumer.subscribe = MagicMock()
+    return mock_consumer
+
+@pytest.fixture()
+def mock_streamer_start():
+    mock_streamer = MagicMock()
+    mock_streamer.start = MagicMock()
+    return mock_streamer
 
 ###########################
-# Tests Implementation of Telemetry Backend
+# funtionality pytest cases with specific fixtures for AnalyticsBackendService class sub-methods
 ###########################
 
-# --- "test_validate_kafka_topics" should be run before the functionality tests ---
-def test_validate_kafka_topics():
-    LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
-    response = KafkaTopic.create_all_topics()
-    assert isinstance(response, bool)
-
-
-# --- To test Dask Streamer functionality ---
-# def test_StartDaskStreamer():   # Directly from the Streamer class
-#     LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ")
-#     stop_event = Event()
-#     kpi_list = ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3']
-#     oper_list = ['avg', 'min', 'max',]
-#     thresholds = {
-#         'avg_value': (10.0, 90.0),
-#         'min_value': (5.0, 95.0),
-#         'max_value': (15.0, 85.0),
-#         'latency'  : (2.0, 10.0)
-#     }
-
-#     # Start the DaskStreamer in a separate thread
-#     streamer_thread = Thread(
-#         target=DaskStreamer,
-#         args=("analytics_stream", kpi_list, oper_list, thresholds, stop_event),
-#         kwargs={
-#             "window_size": "60s",
-#             "win_slide_duration": "30s",
-#             "time_stamp_col": "time_stamp"
-#         }
-#     )
-#     streamer_thread.start()
-#     try:
-#         while True:
-#             time.sleep(10)
-#     except KeyboardInterrupt:
-#         LOGGER.info("KeyboardInterrupt received. Stopping streamer...")
-#         stop_event.set()
-#         streamer_thread.join()
-#         LOGGER.info("Streamer stopped gracefully.")
-
-# --- To test Start Streamer functionality ---
-# def test_StartDaskStreamer():
-#     LOGGER.debug(" >>> test_StartBaskStreamer: START <<< ")
-#     analyzer_obj = create_analyzer_dask()
-#     # LOGGER.info("Created Analyzer Object: {:}".format(analyzer_obj))
-#     analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
-#     analyzer_to_generate : Dict = {
-#         "algo_name"       : analyzer_obj.algorithm_name,
-#         "input_kpis"      : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
-#         "output_kpis"     : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
-#         "oper_mode"       : analyzer_obj.operation_mode,
-#         "thresholds"      : json.loads(analyzer_obj.parameters["thresholds"]),
-#         "oper_list"       : json.loads(analyzer_obj.parameters["oper_list"]),
-#         # "oper_list"       : analyzer_obj.parameters["oper_list"],
-#         "window_size"     : analyzer_obj.parameters["window_size"],
-#         "window_slider"   : analyzer_obj.parameters["window_slider"],
-#         # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] 
-#     }
-#     AnalyticsBackendServiceObj = AnalyticsBackendService()
-#     LOGGER.info("Analyzer to be generated: {:}".format((analyzer_to_generate)))
-#     response = AnalyticsBackendServiceObj.StartDaskListener(analyzer_uuid, analyzer_to_generate)
-#     assert isinstance(response, bool)
-#     time.sleep(100)
-#     LOGGER.info('Initiating StopRequestListener...')
-#     # AnalyticsBackendServiceObj = AnalyticsBackendService()
-#     response = AnalyticsBackendServiceObj.StopDaskListener(analyzer_uuid)
-#     LOGGER.debug(str(response)) 
-#     assert isinstance(response, bool)
+@pytest.fixture
+def analytics_service(mock_kafka_producer, mock_dask_cluster, mock_dask_client, mock_kafka_consumer, mock_streamer_start):
+    with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer', return_value = mock_kafka_producer), \
+         patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_cluster',   return_value = mock_dask_cluster  ), \
+         patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client',    return_value = mock_dask_client   ), \
+         patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer', return_value = mock_kafka_consumer), \
+         patch('analytics.backend.service.Streamer.DaskStreamer.run',                               return_value = mock_streamer_start):
+         
+        service = AnalyticsBackendService()
+        yield service
+        service.close()
 
-# --- To test Start Streamer functionality ---
-# def test_StartSparkStreamer():
-#     LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ")
-#     analyzer_obj = create_analyzer()
-#     analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
-#     analyzer_to_generate : Dict = {
-#         "algo_name"       : analyzer_obj.algorithm_name,
-#         "input_kpis"      : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
-#         "output_kpis"     : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
-#         "oper_mode"       : analyzer_obj.operation_mode,
-#         "thresholds"      : json.loads(analyzer_obj.parameters["thresholds"]),
-#         "window_size"     : analyzer_obj.parameters["window_size"],
-#         "window_slider"   : analyzer_obj.parameters["window_slider"],
-#         # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] 
-#     }
-#     AnalyticsBackendServiceObj = AnalyticsBackendService()
-#     response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate)
-#     assert isinstance(response, bool)
+@pytest.fixture
+def analyzer_data():
+    return {
+        'algo_name'  : 'test_algorithm',
+        'oper_mode'  : 'test_mode',
+        'input_kpis' : get_input_kpi_list(),
+        'output_kpis': get_output_kpi_list(),
+        'thresholds' : get_thresholds(),
+        'batch_size' : get_batch_size(),
+        'window_size': get_windows_size(),
+        'duration'   : get_duration(),
+    }
 
-# --- To TEST StartRequestListenerFunctionality
-# def test_StartRequestListener():
-#     LOGGER.info('test_RunRequestListener')
-#     AnalyticsBackendServiceObj = AnalyticsBackendService()
-#     AnalyticsBackendServiceObj.stop_event = Event()
-#     listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=())
-#     listener_thread.start()
-
-#     time.sleep(100)
-
-    # AnalyticsBackendServiceObj.stop_event.set()
-    # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds')
-    # listener_thread.join(timeout=10)
-    # assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected."
-    # LOGGER.info('Completed test_RunRequestListener')
-
-# To test START and STOP communication together
-# def test_StopRequestListener():
-#     LOGGER.info('test_RunRequestListener')
-#     LOGGER.info('Initiating StartRequestListener...')
-#     AnalyticsBackendServiceObj = AnalyticsBackendService()
-#     response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
-#     # LOGGER.debug(str(response_thread))
-#     time.sleep(10)
-#     LOGGER.info('Initiating StopRequestListener...')
-#     AnalyticsBackendServiceObj = AnalyticsBackendService()
-#     response = AnalyticsBackendServiceObj.StopRequestListener(response_thread)
-#     LOGGER.debug(str(response)) 
-#     assert isinstance(response, bool)
+def test_start_streamer(analytics_service, analyzer_data):
+    analyzer_uuid = "test-analyzer-uuid"
+    # Start streamer
+    result = analytics_service.StartStreamer(analyzer_uuid, analyzer_data)
+    assert result is True
+    assert analyzer_uuid in analytics_service.active_streamers
+
+def test_stop_streamer(analytics_service, analyzer_data):
+    analyzer_uuid = "test-analyzer-uuid"
+
+    # Start streamer for stopping it later
+    analytics_service.StartStreamer(analyzer_uuid, analyzer_data)
+    assert analyzer_uuid in analytics_service.active_streamers
+
+    # Stop streamer
+    result = analytics_service.StopStreamer(analyzer_uuid)
+    assert result is True
+    assert analyzer_uuid not in analytics_service.active_streamers
+
+    # Verify that the streamer was stopped
+    assert analyzer_uuid not in analytics_service.active_streamers
+
+def test_close(analytics_service, mock_kafka_producer, mock_dask_cluster):
+    analytics_service.close()
+
+    mock_kafka_producer.flush.assert_called_once()
+    mock_dask_cluster.close.assert_called_once()
+
+###########################
+# funtionality pytest with specific fixtures for streamer class sub methods
+###########################
+
+@pytest.fixture
+def dask_streamer(mock_kafka_producer, mock_dask_cluster, mock_dask_client, mock_kafka_consumer):
+    with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_producer', return_value = mock_kafka_producer), \
+         patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_cluster',   return_value = mock_dask_cluster  ), \
+         patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client',    return_value = mock_dask_client   ), \
+         patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_kafka_consumer', return_value = mock_kafka_consumer):
+        
+        return DaskStreamer(
+            key               = "test_key",
+            input_kpis        = get_input_kpi_list(),
+            output_kpis       = get_output_kpi_list(),
+            thresholds        = get_thresholds(),
+            batch_size        = get_batch_size(),
+            window_size       = get_windows_size(),
+            cluster_instance  = mock_dask_cluster(),
+            producer_instance = mock_kafka_producer(),
+        )
 
-# To independently tests the SparkListener functionality
-# def test_SparkListener():
-#     LOGGER.info('test_RunRequestListener')
-#     AnalyticsBackendServiceObj = AnalyticsBackendService()
-#     response = AnalyticsBackendServiceObj.RunSparkStreamer(
-#         get_kpi_id_list(), get_operation_list(), get_threshold_dict()
-#         )
-#     LOGGER.debug(str(response))
+def test_dask_streamer_initialization(dask_streamer):
+    """Test if the DaskStreamer initializes correctly."""
+    assert dask_streamer.key         == "test_key"
+    assert dask_streamer.batch_size  == get_batch_size()
+    assert dask_streamer.window_size is None
+    assert dask_streamer.consumer    is not None
+    assert dask_streamer.producer    is not None
+    assert dask_streamer.client      is not None
+
+def test_run_stops_on_no_consumer(dask_streamer):
+    """Test if the run method exits when the consumer is not initialized."""
+    dask_streamer.consumer = None
+    with patch('time.sleep', return_value=None):
+        dask_streamer.run()
+
+    assert not dask_streamer.running
+
+def test_task_handler_selector_valid_handler(dask_streamer, mock_dask_client):
+    """Test task handler selection with a valid handler."""
+    with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.initialize_dask_client', return_value = mock_dask_client):
+
+        dask_streamer.task_handler_selector()
+        assert dask_streamer.client.status == 'running'
+
+def test_task_handler_selector_invalid_handler(dask_streamer):
+    """Test task handler selection with an invalid handler."""
+    with patch('analytics.backend.service.AnalyzerHandlers.Handlers.is_valid_handler', return_value=False):
+        dask_streamer.task_handler_selector()
+        assert dask_streamer.batch == []
+
+def test_produce_result(dask_streamer):
+    """Test if produce_result sends records to Kafka."""
+    result = [{"kpi_id": "kpi1", "value": 100}]
+    with patch('analytics.backend.service.AnalyzerHelper.AnalyzerHelper.delivery_report', return_value=None) as mock_delivery_report, \
+         patch.object(dask_streamer.producer, 'produce') as mock_produce:
+        dask_streamer.produce_result(result, "test_topic")
+        mock_produce.assert_called_once_with(
+            "test_topic",
+            key="kpi1",
+            value=json.dumps({"kpi_id": "kpi1", "value": 100}),
+            callback=mock_delivery_report
+        )
+
+def test_stop(dask_streamer):
+    """Test the cleanup method."""
+    with patch.object(dask_streamer.consumer, 'close') as mock_consumer_close, \
+         patch.object(dask_streamer.client,   'close') as mock_client_close, \
+         patch('time.sleep', return_value=0):
+        
+        # Mock the conditions required for the close calls
+        dask_streamer.client.status = 'running'
+        
+        dask_streamer.stop()
+
+        mock_consumer_close.assert_called_once()
+        mock_client_close.assert_called_once()
+
+def test_run_with_valid_consumer(dask_streamer):
+    """Test the run method with a valid Kafka consumer."""
+    with patch.object(dask_streamer.consumer, 'poll')         as mock_poll, \
+         patch.object(dask_streamer, 'task_handler_selector') as mock_task_handler_selector:
+        
+        # Simulate valid messages without errors
+        mock_message_1 = MagicMock()
+        mock_message_1.value.return_value = b'{"kpi_id": "kpi1", "value": 100}'
+        mock_message_1.error.return_value = None  # No error
+
+        mock_message_2 = MagicMock()
+        mock_message_2.value.return_value = b'{"kpi_id": "kpi2", "value": 200}'
+        mock_message_2.error.return_value = None  # No error
+
+        # Mock `poll` to return valid messages
+        mock_poll.side_effect = [mock_message_1, mock_message_2]
+        
+        # Run the `run` method in a limited loop
+        with patch('time.sleep', return_value=None):  # Mock `sleep` to avoid delays
+            dask_streamer.running = True            
+            dask_streamer.batch_size = 2            
+            
+            # Limit the loop by breaking it after one full processing cycle
+            def stop_running_after_task_handler():
+                logger.info("Stopping the streamer after processing the first batch.")
+                dask_streamer.running = False
+            
+            mock_task_handler_selector.side_effect = stop_running_after_task_handler
+            dask_streamer.run()
+        
+        assert len(dask_streamer.batch) == 0  # Batch should be cleared after processing
+        mock_task_handler_selector.assert_called_once()  # Task handler should be called once
+        mock_poll.assert_any_call(timeout=2.0)  # Poll should have been called at least once
+
+# # add a test to check the working of aggregation_handler function and threshold_handler from AnalyzerHandlers.py
+def test_aggregation_handler():
+
+    # Create a sample batch
+    batch           = get_batch()
+    input_kpi_list  = get_input_kpi_list()
+    output_kpi_list = get_output_kpi_list()
+    thresholds      = get_thresholds() 
+
+    # Test aggregation_handler
+    aggregated_df = aggregation_handler(
+        "test_batch", "test_key", batch, input_kpi_list, output_kpi_list, thresholds
+    )
+    assert isinstance(aggregated_df, list)
+    assert all(isinstance(item, dict) for item in aggregated_df)
+
+# # Test threshold_handler
+def test_threshold_handler():
+    # Create a sample aggregated DataFrame
+    agg_df     = get_agg_df()
+    thresholds = get_thresholds()
+
+    # Test threshold_handler
+    result = threshold_handler("test_key", agg_df, thresholds["task_parameter"][0])
+
+    assert isinstance(result, pd.DataFrame)
+    assert result.shape == (1, 7)
+
+
+###########################
+# integration test of Streamer with backend service (Shouldn't be run in the CI/CD pipeline)
+###########################
+# This is a local machine test to check the integration of the backend service with the Streamer
+
+# --- "test_validate_kafka_topics" should be run before the functionality tests ---
+# def test_validate_kafka_topics():
+#     logger.debug(" >>> test_validate_kafka_topics: START <<< ")
+#     response = KafkaTopic.create_all_topics()
 #     assert isinstance(response, bool)
+
+# def test_backend_integration_with_analyzer():
+#     backendServiceObject = AnalyticsBackendService()
+#     backendServiceObject.install_servicers()
+#     logger.info(" waiting for 2 minutes for the backend service before termination  ... ")
+#     time.sleep(150)
+#     logger.info(" Initiating stop collector ... ")
+#     status = backendServiceObject.StopStreamer("efef4d95-1cf1-43c4-9742-95c283ddd666")
+#     backendServiceObject.close()
+#     assert isinstance(status, bool)
+#     assert status == True
+#     logger.info(" Backend service terminated successfully ... ")