diff --git a/.gitignore b/.gitignore
index 20b98c30c5b3edb0983578b0a5f74fb1c1f3025e..e1f87cfd3842c264bd219237e9afe113d61c35bc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -176,3 +176,6 @@ libyang/
 
 # Other logs
 **/logs/*.log.*
+
+# PySpark checkpoints
+src/analytics/.spark/*
diff --git a/deploy/all.sh b/deploy/all.sh
index e9b33b469b7cad1547ab0dcb63e326672f51971e..06b8ee701530f56381080879d0e2941b664e5197 100755
--- a/deploy/all.sh
+++ b/deploy/all.sh
@@ -33,7 +33,7 @@ export TFS_COMPONENTS=${TFS_COMPONENTS:-"context device pathcomp service slice n
 #export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring"
 
 # Uncomment to activate Monitoring Framework (new)
-#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api"
+#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api telemetry analytics"
 
 # Uncomment to activate BGP-LS Speaker
 #export TFS_COMPONENTS="${TFS_COMPONENTS} bgpls_speaker"
diff --git a/deploy/tfs.sh b/deploy/tfs.sh
index e7201441815c7cc08c46cce3714f33f43401c2eb..1dceae1c1b4ee3e2a36816557b54df48b224eba1 100755
--- a/deploy/tfs.sh
+++ b/deploy/tfs.sh
@@ -182,7 +182,19 @@ kubectl create secret generic crdb-telemetry --namespace ${TFS_K8S_NAMESPACE} --
     --from-literal=CRDB_SSLMODE=require
 printf "\n"
 
-echo "Create secret with Apache Kafka data for KPI and Telemetry microservices"
+echo "Create secret with CockroachDB data for Analytics microservices"
+CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}')
+CRDB_DATABASE_ANALYTICS="tfs_analytics"  # TODO: change by specific configurable environment variable
+kubectl create secret generic crdb-analytics --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \
+    --from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \
+    --from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \
+    --from-literal=CRDB_DATABASE=${CRDB_DATABASE_ANALYTICS} \
+    --from-literal=CRDB_USERNAME=${CRDB_USERNAME} \
+    --from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \
+    --from-literal=CRDB_SSLMODE=require
+printf "\n"
+
+echo "Create secret with Apache Kafka data for KPI, Telemetry and Analytics microservices"
 KFK_SERVER_PORT=$(kubectl --namespace ${KFK_NAMESPACE} get service kafka-service -o 'jsonpath={.spec.ports[0].port}')
 kubectl create secret generic kfk-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \
     --from-literal=KFK_NAMESPACE=${KFK_NAMESPACE} \
@@ -264,7 +276,7 @@ for COMPONENT in $TFS_COMPONENTS; do
 
         if [ "$COMPONENT" == "ztp" ] || [ "$COMPONENT" == "policy" ]; then
             $DOCKER_BUILD -t "$COMPONENT:$TFS_IMAGE_TAG" -f ./src/"$COMPONENT"/Dockerfile ./src/"$COMPONENT"/ > "$BUILD_LOG"
-        elif [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ]; then
+        elif [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ] || [ "$COMPONENT" == "analytics" ]; then
             BUILD_LOG="$TMP_LOGS_FOLDER/build_${COMPONENT}-frontend.log"
             $DOCKER_BUILD -t "$COMPONENT-frontend:$TFS_IMAGE_TAG" -f ./src/"$COMPONENT"/frontend/Dockerfile . > "$BUILD_LOG"
 
@@ -287,7 +299,7 @@ for COMPONENT in $TFS_COMPONENTS; do
 
         echo "  Pushing Docker image to '$TFS_REGISTRY_IMAGES'..."
 
-        if [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ]; then
+        if [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ] || [ "$COMPONENT" == "analytics" ] ; then
             IMAGE_URL=$(echo "$TFS_REGISTRY_IMAGES/$COMPONENT-frontend:$TFS_IMAGE_TAG" | sed 's,//,/,g' | sed 's,http:/,,g')
 
             TAG_LOG="$TMP_LOGS_FOLDER/tag_${COMPONENT}-frontend.log"
@@ -338,7 +350,7 @@ for COMPONENT in $TFS_COMPONENTS; do
         cp ./manifests/"${COMPONENT}"service.yaml "$MANIFEST"
     fi
 
-    if [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ]; then
+    if [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ] || [ "$COMPONENT" == "analytics" ]; then
         IMAGE_URL=$(echo "$TFS_REGISTRY_IMAGES/$COMPONENT-frontend:$TFS_IMAGE_TAG" | sed 's,//,/,g' | sed 's,http:/,,g')
         VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}-frontend:" "$MANIFEST" | cut -d ":" -f4)
         sed -E -i "s#image: $GITLAB_REPO_URL/$COMPONENT-frontend:${VERSION}#image: $IMAGE_URL#g" "$MANIFEST"
diff --git a/manifests/analyticsservice.yaml b/manifests/analyticsservice.yaml
new file mode 100644
index 0000000000000000000000000000000000000000..0fa3ed0be6eda8cf944e199543e3c2cd59cc98d6
--- /dev/null
+++ b/manifests/analyticsservice.yaml
@@ -0,0 +1,128 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: analyticsservice
+spec:
+  selector:
+    matchLabels:
+      app: analyticsservice
+  #replicas: 1
+  template:
+    metadata:
+      labels:
+        app: analyticsservice
+    spec:
+      terminationGracePeriodSeconds: 5
+      containers:
+        - name: frontend
+          image: labs.etsi.org:5050/tfs/controller/analytics-frontend:latest
+          imagePullPolicy: Always
+          ports:
+            - containerPort: 30080
+            - containerPort: 9192
+          env:
+            - name: LOG_LEVEL
+              value: "INFO"
+          envFrom:
+            - secretRef:
+                name: crdb-analytics
+            - secretRef:
+                name: kfk-kpi-data
+          readinessProbe:
+            exec:
+              command: ["/bin/grpc_health_probe", "-addr=:30080"]
+          livenessProbe:
+            exec:
+              command: ["/bin/grpc_health_probe", "-addr=:30080"]
+          resources:
+            requests:
+              cpu: 250m
+              memory: 128Mi
+            limits:
+              cpu: 1000m
+              memory: 1024Mi
+        - name: backend
+          image: labs.etsi.org:5050/tfs/controller/analytics-backend:latest
+          imagePullPolicy: Always
+          ports:
+            - containerPort: 30090
+            - containerPort: 9192
+          env:
+            - name: LOG_LEVEL
+              value: "INFO"
+          envFrom:
+            - secretRef:
+                name: kfk-kpi-data
+          readinessProbe:
+            exec:
+              command: ["/bin/grpc_health_probe", "-addr=:30090"]
+          livenessProbe:
+            exec:
+              command: ["/bin/grpc_health_probe", "-addr=:30090"]
+          resources:
+            requests:
+              cpu: 250m
+              memory: 128Mi
+            limits:
+              cpu: 1000m
+              memory: 1024Mi
+---
+apiVersion: v1
+kind: Service
+metadata:
+  name: analyticsservice
+  labels:
+    app: analyticsservice
+spec:
+  type: ClusterIP
+  selector:
+    app: analyticsservice
+  ports:
+    - name: frontend-grpc
+      protocol: TCP
+      port: 30080
+      targetPort: 30080
+    - name: backend-grpc
+      protocol: TCP
+      port: 30090
+      targetPort: 30090
+    - name: metrics
+      protocol: TCP
+      port: 9192
+      targetPort: 9192
+---
+apiVersion: autoscaling/v2
+kind: HorizontalPodAutoscaler
+metadata:
+  name: analyticsservice-hpa
+spec:
+  scaleTargetRef:
+    apiVersion: apps/v1
+    kind: Deployment
+    name: analyticsservice
+  minReplicas: 1
+  maxReplicas: 20
+  metrics:
+    - type: Resource
+      resource:
+        name: cpu
+        target:
+          type: Utilization
+          averageUtilization: 80
+  #behavior:
+  #  scaleDown:
+  #    stabilizationWindowSeconds: 30
diff --git a/manifests/kafka/02-kafka.yaml b/manifests/kafka/02-kafka.yaml
index 8e4562e6eabec34bf3b87912310479bd98022aeb..8400f5944193458ccdad8be5dbc189f8f40cdd7b 100644
--- a/manifests/kafka/02-kafka.yaml
+++ b/manifests/kafka/02-kafka.yaml
@@ -53,9 +53,9 @@ spec:
         - name: KAFKA_LISTENERS
           value: PLAINTEXT://:9092
         - name: KAFKA_ADVERTISED_LISTENERS
-          value: PLAINTEXT://localhost:9092
+          value: PLAINTEXT://kafka-service.kafka.svc.cluster.local:9092
         image: wurstmeister/kafka
         imagePullPolicy: IfNotPresent
         name: kafka-broker
         ports:
-          - containerPort: 9092
\ No newline at end of file
+          - containerPort: 9092
diff --git a/proto/analytics_frontend.proto b/proto/analytics_frontend.proto
index 096c1ee035ae663359d9f4df1e071d3997a0d351..ace0581db816bee1d0d20746f2b864dce602567b 100644
--- a/proto/analytics_frontend.proto
+++ b/proto/analytics_frontend.proto
@@ -30,21 +30,25 @@ message AnalyzerId {
 }
 
 enum AnalyzerOperationMode {
-  ANALYZEROPERATIONMODE_BATCH     = 0;
-  ANALYZEROPERATIONMODE_STREAMING = 1;
+  ANALYZEROPERATIONMODE_UNSPECIFIED = 0;
+  ANALYZEROPERATIONMODE_BATCH       = 1;
+  ANALYZEROPERATIONMODE_STREAMING   = 2;
 }
 
+// duration field may be added in analyzer... 
 message Analyzer {
-  string                     algorithm_name       = 1; // The algorithm to be executed
-  repeated kpi_manager.KpiId input_kpi_ids        = 2; // The KPI Ids to be processed by the analyzer
-  repeated kpi_manager.KpiId output_kpi_ids       = 3; // The KPI Ids produced by the analyzer
-  AnalyzerOperationMode      operation_mode       = 4; // Operation mode of the analyzer
-
-  // In batch mode...
-  float                      batch_min_duration_s = 5; // ..., min duration to collect before executing batch
-  float                      batch_max_duration_s = 6; // ..., max duration collected to execute the batch
-  uint64                     batch_min_size       = 7; // ..., min number of samples to collect before executing batch
-  uint64                     batch_max_size       = 8; // ..., max number of samples collected to execute the batch
+  AnalyzerId                 analyzer_id          = 1;
+  string                     algorithm_name       = 2;  // The algorithm to be executed
+  float                      duration_s           = 3;  // Termiate the data analytics thread after duration (seconds); 0 = infinity time
+  repeated kpi_manager.KpiId input_kpi_ids        = 4;  // The KPI Ids to be processed by the analyzer
+  repeated kpi_manager.KpiId output_kpi_ids       = 5;  // The KPI Ids produced by the analyzer
+  AnalyzerOperationMode      operation_mode       = 6;  // Operation mode of the analyzer
+  map<string, string>        parameters           = 7;  // Add dictionary of (key, value) pairs such as (window_size, 10) etc.
+  // In batch mode... 
+  float                      batch_min_duration_s = 8;  // ..., min duration to collect before executing batch
+  float                      batch_max_duration_s = 9;  // ..., max duration collected to execute the batch
+  uint64                     batch_min_size       = 10; // ..., min number of samples to collect before executing batch
+  uint64                     batch_max_size       = 11; // ..., max number of samples collected to execute the batch
 }
 
 message AnalyzerFilter {
diff --git a/scripts/run_tests_locally-analytics-DB.sh b/scripts/run_tests_locally-analytics-DB.sh
new file mode 100755
index 0000000000000000000000000000000000000000..9df5068d6bde361a4a1e73b96990c0d407c88cb4
--- /dev/null
+++ b/scripts/run_tests_locally-analytics-DB.sh
@@ -0,0 +1,24 @@
+#!/bin/bash
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+PROJECTDIR=`pwd`
+
+cd $PROJECTDIR/src
+RCFILE=$PROJECTDIR/coverage/.coveragerc
+CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
+export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
+python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
+    analytics/tests/test_analytics_db.py
diff --git a/scripts/run_tests_locally-analytics-frontend.sh b/scripts/run_tests_locally-analytics-frontend.sh
new file mode 100755
index 0000000000000000000000000000000000000000..e30d30da623b2d0eee3d925d69a846b4b1f516a3
--- /dev/null
+++ b/scripts/run_tests_locally-analytics-frontend.sh
@@ -0,0 +1,24 @@
+#!/bin/bash
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+PROJECTDIR=`pwd`
+
+cd $PROJECTDIR/src
+RCFILE=$PROJECTDIR/coverage/.coveragerc
+CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
+export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
+python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
+    analytics/frontend/tests/test_frontend.py
diff --git a/scripts/run_tests_locally-telemetry-backend.sh b/scripts/run_tests_locally-telemetry-backend.sh
index 9cf404ffcef6c99b261f81eb0c6b910dd60845e5..79db05fcf1259365e8a909ee99395eb59dfb9437 100755
--- a/scripts/run_tests_locally-telemetry-backend.sh
+++ b/scripts/run_tests_locally-telemetry-backend.sh
@@ -24,5 +24,5 @@ cd $PROJECTDIR/src
 # python3 kpi_manager/tests/test_unitary.py
 
 RCFILE=$PROJECTDIR/coverage/.coveragerc
-python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \
-    telemetry/backend/tests/testTelemetryBackend.py
+python3 -m pytest --log-level=INFO --log-cli-level=debug --verbose \
+    telemetry/backend/tests/test_TelemetryBackend.py
diff --git a/scripts/show_logs_analytics_backend.sh b/scripts/show_logs_analytics_backend.sh
new file mode 100755
index 0000000000000000000000000000000000000000..afb58567ca5ab250da48d2cfffa2c56abdff2db2
--- /dev/null
+++ b/scripts/show_logs_analytics_backend.sh
@@ -0,0 +1,27 @@
+#!/bin/bash
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+########################################################################################################################
+# Define your deployment settings here
+########################################################################################################################
+
+# If not already set, set the name of the Kubernetes namespace to deploy to.
+export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
+
+########################################################################################################################
+# Automated steps start here
+########################################################################################################################
+
+kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/analyticsservice -c backend
diff --git a/scripts/show_logs_analytics_frontend.sh b/scripts/show_logs_analytics_frontend.sh
new file mode 100755
index 0000000000000000000000000000000000000000..6d3fae10b366f0082d3a393c224e8f1cb7830721
--- /dev/null
+++ b/scripts/show_logs_analytics_frontend.sh
@@ -0,0 +1,27 @@
+#!/bin/bash
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+########################################################################################################################
+# Define your deployment settings here
+########################################################################################################################
+
+# If not already set, set the name of the Kubernetes namespace to deploy to.
+export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
+
+########################################################################################################################
+# Automated steps start here
+########################################################################################################################
+
+kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/analyticsservice -c frontend
diff --git a/src/analytics/README.md b/src/analytics/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..9663e5321ace6866491b90553553d9ccbf5793a1
--- /dev/null
+++ b/src/analytics/README.md
@@ -0,0 +1,4 @@
+# How to locally run and test Analytic service (To be added soon)
+
+### Pre-requisets 
+The following requirements should be fulfilled before the execuation of Telemetry service.
diff --git a/src/analytics/__init__.py b/src/analytics/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..bbfc943b68af13a11e562abbc8680ade71db8f02
--- /dev/null
+++ b/src/analytics/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/src/analytics/backend/Dockerfile b/src/analytics/backend/Dockerfile
new file mode 100644
index 0000000000000000000000000000000000000000..17adcd3ab1df5704cc7ef0c5a19b3cfb1539ee22
--- /dev/null
+++ b/src/analytics/backend/Dockerfile
@@ -0,0 +1,69 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM python:3.9-slim
+
+# Install dependencies
+RUN apt-get --yes --quiet --quiet update && \
+    apt-get --yes --quiet --quiet install wget g++ git && \
+    rm -rf /var/lib/apt/lists/*
+
+# Set Python to show logs as they occur
+ENV PYTHONUNBUFFERED=0
+
+# Download the gRPC health probe
+RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
+    wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
+    chmod +x /bin/grpc_health_probe
+
+# Get generic Python packages
+RUN python3 -m pip install --upgrade pip
+RUN python3 -m pip install --upgrade setuptools wheel
+RUN python3 -m pip install --upgrade pip-tools
+
+# Get common Python packages
+# Note: this step enables sharing the previous Docker build steps among all the Python components
+WORKDIR /var/teraflow
+COPY common_requirements.in common_requirements.in
+RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in
+RUN python3 -m pip install -r common_requirements.txt
+
+# Add common files into working directory
+WORKDIR /var/teraflow/common
+COPY src/common/. ./
+RUN rm -rf proto
+
+# Create proto sub-folder, copy .proto files, and generate Python code
+RUN mkdir -p /var/teraflow/common/proto
+WORKDIR /var/teraflow/common/proto
+RUN touch __init__.py
+COPY proto/*.proto ./
+RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
+RUN rm *.proto
+RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
+
+# Create component sub-folders, get specific Python packages
+RUN mkdir -p /var/teraflow/analytics/backend
+WORKDIR /var/teraflow/analytics/backend
+COPY src/analytics/backend/requirements.in requirements.in
+RUN pip-compile --quiet --output-file=requirements.txt requirements.in
+RUN python3 -m pip install -r requirements.txt
+
+# Add component files into working directory
+WORKDIR /var/teraflow
+COPY src/analytics/__init__.py analytics/__init__.py
+COPY src/analytics/backend/. analytics/backend/
+
+# Start the service
+ENTRYPOINT ["python", "-m", "analytics.backend.service"]
diff --git a/src/analytics/backend/__init__.py b/src/analytics/backend/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..bbfc943b68af13a11e562abbc8680ade71db8f02
--- /dev/null
+++ b/src/analytics/backend/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/src/analytics/backend/requirements.in b/src/analytics/backend/requirements.in
new file mode 100644
index 0000000000000000000000000000000000000000..9df678fe819f33d479b8f5090ca9ac4eb1f4047c
--- /dev/null
+++ b/src/analytics/backend/requirements.in
@@ -0,0 +1,16 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+pyspark==3.5.2
+confluent-kafka==2.3.*
diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py
new file mode 100755
index 0000000000000000000000000000000000000000..595603567fe537d9f7b33224cba0fe016a439631
--- /dev/null
+++ b/src/analytics/backend/service/AnalyticsBackendService.py
@@ -0,0 +1,132 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import json
+import logging
+import threading
+from common.tools.service.GenericGrpcService import GenericGrpcService
+from analytics.backend.service.SparkStreaming import SparkStreamer
+from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
+from confluent_kafka import Consumer as KafkaConsumer
+from confluent_kafka import KafkaError
+from common.Constants import ServiceNameEnum
+from common.Settings import get_service_port_grpc
+
+
+LOGGER = logging.getLogger(__name__)
+
+class AnalyticsBackendService(GenericGrpcService):
+    """
+    Class listens for ...
+    """
+    def __init__(self, cls_name : str = __name__) -> None:
+        LOGGER.info('Init AnalyticsBackendService')
+        port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND)
+        super().__init__(port, cls_name=cls_name)
+        self.running_threads = {}       # To keep track of all running analyzers 
+        self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
+                                            'group.id'           : 'analytics-frontend',
+                                            'auto.offset.reset'  : 'latest'})
+
+    def StartSparkStreamer(self, analyzer_uuid, analyzer):
+        kpi_list      = analyzer['input_kpis'] 
+        oper_list     = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())]  # TODO: update this line...
+        thresholds    = analyzer['thresholds']
+        window_size   = analyzer['window_size']
+        window_slider = analyzer['window_slider']
+        print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
+            kpi_list, oper_list, thresholds, window_size, window_slider))
+        LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
+            kpi_list, oper_list, thresholds, window_size, window_slider))
+        try:
+            stop_event = threading.Event()
+            thread = threading.Thread(target=SparkStreamer, 
+                            args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event,
+                                  window_size, window_slider, None ))
+            self.running_threads[analyzer_uuid] = (thread, stop_event)
+            thread.start()
+            print      ("Initiated Analyzer backend: {:}".format(analyzer_uuid))
+            LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid))
+            return True
+        except Exception as e:
+            print       ("Failed to initiate Analyzer backend: {:}".format(e))
+            LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
+            return False
+
+    def StopRequestListener(self, threadInfo: tuple):
+        try:
+            thread, stop_event = threadInfo
+            stop_event.set()
+            thread.join()
+            print      ("Terminating Analytics backend RequestListener")
+            LOGGER.info("Terminating Analytics backend RequestListener")
+            return True
+        except Exception as e:
+            print       ("Failed to terminate analytics backend {:}".format(e))
+            LOGGER.error("Failed to terminate analytics backend {:}".format(e))
+            return False
+
+    def install_services(self):
+        stop_event = threading.Event()
+        thread = threading.Thread(target=self.RequestListener,
+                                  args=(stop_event,) )
+        thread.start()
+        return (thread, stop_event)
+
+    def RequestListener(self, stop_event):
+        """
+        listener for requests on Kafka topic.
+        """
+        consumer = self.kafka_consumer
+        consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
+        while not stop_event.is_set():
+            receive_msg = consumer.poll(2.0)
+            if receive_msg is None:
+                continue
+            elif receive_msg.error():
+                if receive_msg.error().code() == KafkaError._PARTITION_EOF:
+                    continue
+                else:
+                    print("Consumer error: {}".format(receive_msg.error()))
+                    break
+            analyzer    = json.loads(receive_msg.value().decode('utf-8'))
+            analyzer_uuid = receive_msg.key().decode('utf-8')
+            LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
+            print       ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
+
+            if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
+                self.TerminateAnalyzerBackend(analyzer_uuid)
+            else:
+                self.StartSparkStreamer(analyzer_uuid, analyzer)
+        LOGGER.debug("Stop Event activated. Terminating...")
+        print       ("Stop Event activated. Terminating...")
+
+    def TerminateAnalyzerBackend(self, analyzer_uuid):
+        if analyzer_uuid in self.running_threads:
+            try:
+                thread, stop_event = self.running_threads[analyzer_uuid]
+                stop_event.set()
+                thread.join()
+                del self.running_threads[analyzer_uuid]
+                print      ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
+                LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
+                return True
+            except Exception as e:
+                LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: {:}".format(analyzer_uuid, e))
+                return False
+        else:
+            print         ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid))
+            LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))           
+            # generate confirmation towards frontend
diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py
new file mode 100644
index 0000000000000000000000000000000000000000..96e1aa05d898ffdd23c533b74ee87fbf03f54576
--- /dev/null
+++ b/src/analytics/backend/service/SparkStreaming.py
@@ -0,0 +1,154 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging, time
+from pyspark.sql                  import SparkSession
+from pyspark.sql.types            import StructType, StructField, StringType, DoubleType, TimestampType
+from pyspark.sql.functions        import from_json, col, window, avg, min, max, first, last, stddev, when, round
+from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
+
+LOGGER = logging.getLogger(__name__)
+
+def DefiningSparkSession():
+    # Create a Spark session with specific spark verions (3.5.0)
+    return SparkSession.builder \
+            .appName("Analytics") \
+            .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
+            .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
+            .getOrCreate()
+
+def SettingKafkaConsumerParams():   # TODO:  create get_kafka_consumer() in common with inputs (bootstrap server, subscribe, startingOffset and failOnDataLoss with default values)
+    return {
+            # "kafka.bootstrap.servers": '127.0.0.1:9092',
+            "kafka.bootstrap.servers": KafkaConfig.get_kafka_address(),
+            "subscribe"              : KafkaTopic.VALUE.value,
+            "startingOffsets"        : 'latest',
+            "failOnDataLoss"         : 'false'              # Optional: Set to "true" to fail the query on data loss
+        }
+
+def DefiningRequestSchema():
+    return StructType([
+            StructField("time_stamp" ,  StringType()  , True),
+            StructField("kpi_id"     ,  StringType()  , True),
+            StructField("kpi_value"  ,  DoubleType()  , True)
+        ])
+
+def GetAggregations(oper_list):
+    # Define the possible aggregation functions
+    agg_functions = {
+        'avg'  :  round(avg    ("kpi_value"), 3) .alias("avg_value"),
+        'min'  :  round(min    ("kpi_value"), 3) .alias("min_value"),
+        'max'  :  round(max    ("kpi_value"), 3) .alias("max_value"),
+        'first':  round(first  ("kpi_value"), 3) .alias("first_value"),
+        'last' :  round(last   ("kpi_value"), 3) .alias("last_value"),
+        'stdev':  round(stddev ("kpi_value"), 3) .alias("stdev_value")
+    }
+    return [agg_functions[op] for op in oper_list if op in agg_functions]   # Filter and return only the selected aggregations
+
+def ApplyThresholds(aggregated_df, thresholds):
+    # Apply thresholds (TH-Fail and TH-RAISE) based on the thresholds dictionary on the aggregated DataFrame.
+    
+    # Loop through each column name and its associated thresholds
+    for col_name, (fail_th, raise_th) in thresholds.items():
+        # Apply TH-Fail condition (if column value is less than the fail threshold)
+        aggregated_df = aggregated_df.withColumn(
+            f"{col_name}_THRESHOLD_FAIL", 
+            when(col(col_name) < fail_th, True).otherwise(False)
+        )
+        # Apply TH-RAISE condition (if column value is greater than the raise threshold)
+        aggregated_df = aggregated_df.withColumn(
+            f"{col_name}_THRESHOLD_RAISE", 
+            when(col(col_name) > raise_th, True).otherwise(False)
+        )
+    return aggregated_df
+
+def SparkStreamer(key, kpi_list, oper_list, thresholds, stop_event,
+                  window_size=None, win_slide_duration=None, time_stamp_col=None):
+    """
+    Method to perform Spark operation Kafka stream.
+    NOTE: Kafka topic to be processesd should have atleast one row before initiating the spark session. 
+    """
+    kafka_consumer_params = SettingKafkaConsumerParams()         # Define the Kafka consumer parameters
+    schema                = DefiningRequestSchema()              # Define the schema for the incoming JSON data
+    spark                 = DefiningSparkSession()               # Define the spark session with app name and spark version
+    
+    # extra options default assignment
+    if window_size        is None: window_size        = "60 seconds"    # default
+    if win_slide_duration is None: win_slide_duration = "30 seconds"    # default
+    if time_stamp_col     is None: time_stamp_col     = "time_stamp"    # default
+    
+    try:
+        # Read data from Kafka
+        raw_stream_data = spark \
+            .readStream \
+            .format("kafka") \
+            .options(**kafka_consumer_params) \
+            .load()
+
+        # Convert the value column from Kafka to a string
+        stream_data          = raw_stream_data.selectExpr("CAST(value AS STRING)")
+        # Parse the JSON string into a DataFrame with the defined schema
+        parsed_stream_data   = stream_data.withColumn("parsed_value", from_json(col("value"), schema))
+        # Select the parsed fields
+        final_stream_data    = parsed_stream_data.select("parsed_value.*")
+        # Convert the time_stamp to proper timestamp (assuming it's in ISO format)
+        final_stream_data    = final_stream_data.withColumn(time_stamp_col, col(time_stamp_col).cast(TimestampType()))
+        # Filter the stream to only include rows where the kpi_id is in the kpi_list
+        filtered_stream_data = final_stream_data.filter(col("kpi_id").isin(kpi_list))
+         # Define a window for aggregation
+        windowed_stream_data = filtered_stream_data \
+                                .groupBy(
+                                    window( col(time_stamp_col), 
+                                           window_size, slideDuration=win_slide_duration
+                                           ),
+                                    col("kpi_id")
+                                ) \
+                                .agg(*GetAggregations(oper_list))
+        # Apply thresholds to the aggregated data
+        thresholded_stream_data = ApplyThresholds(windowed_stream_data, thresholds)
+
+        # --- This will write output on console: FOR TESTING PURPOSES
+        # Start the Spark streaming query
+        # query = thresholded_stream_data \
+        #     .writeStream \
+        #     .outputMode("update") \
+        #     .format("console") 
+
+        # --- This will write output to Kafka: ACTUAL IMPLEMENTATION
+        query = thresholded_stream_data \
+            .selectExpr(f"'{key}' AS key", "to_json(struct(*)) AS value") \
+            .writeStream \
+            .format("kafka") \
+            .option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \
+            .option("topic",                   KafkaTopic.ANALYTICS_RESPONSE.value) \
+            .option("checkpointLocation",      "analytics/.spark/checkpoint") \
+            .outputMode("update")
+
+        # Start the query execution
+        queryHandler = query.start()
+
+        # Loop to check for stop event flag. To be set by stop collector method.
+        while True:
+            if stop_event.is_set():
+                LOGGER.debug("Stop Event activated. Terminating in 5 seconds...")
+                print       ("Stop Event activated. Terminating in 5 seconds...")
+                time.sleep(5)
+                queryHandler.stop()
+                break
+            time.sleep(5)
+
+    except Exception as e:
+        print("Error in Spark streaming process: {:}".format(e))
+        LOGGER.debug("Error in Spark streaming process: {:}".format(e))
diff --git a/src/analytics/backend/service/__init__.py b/src/analytics/backend/service/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..bbfc943b68af13a11e562abbc8680ade71db8f02
--- /dev/null
+++ b/src/analytics/backend/service/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/src/analytics/backend/service/__main__.py b/src/analytics/backend/service/__main__.py
new file mode 100644
index 0000000000000000000000000000000000000000..3c4c36b7c7bd952164bf9e48a45e22fb00575564
--- /dev/null
+++ b/src/analytics/backend/service/__main__.py
@@ -0,0 +1,56 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging, signal, sys, threading
+from prometheus_client import start_http_server
+from common.Settings import get_log_level, get_metrics_port
+from .AnalyticsBackendService import AnalyticsBackendService
+
+terminate = threading.Event()
+LOGGER = None
+
+def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
+    LOGGER.warning('Terminate signal received')
+    terminate.set()
+
+def main():
+    global LOGGER # pylint: disable=global-statement
+
+    log_level = get_log_level()
+    logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
+    LOGGER = logging.getLogger(__name__)
+
+    signal.signal(signal.SIGINT,  signal_handler)
+    signal.signal(signal.SIGTERM, signal_handler)
+
+    LOGGER.info('Starting...')
+
+    # Start metrics server
+    metrics_port = get_metrics_port()
+    start_http_server(metrics_port)
+
+    grpc_service = AnalyticsBackendService()
+    grpc_service.start()
+
+    # Wait for Ctrl+C or termination signal
+    while not terminate.wait(timeout=1.0): pass
+
+    LOGGER.info('Terminating...')
+    grpc_service.stop()
+
+    LOGGER.info('Bye')
+    return 0
+
+if __name__ == '__main__':
+    sys.exit(main())
diff --git a/src/analytics/backend/tests/__init__.py b/src/analytics/backend/tests/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..bbfc943b68af13a11e562abbc8680ade71db8f02
--- /dev/null
+++ b/src/analytics/backend/tests/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py
new file mode 100644
index 0000000000000000000000000000000000000000..9acd6ad9dffe4a5b10b107a6923ed85170ee141f
--- /dev/null
+++ b/src/analytics/backend/tests/messages.py
@@ -0,0 +1,34 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+def get_kpi_id_list():
+    return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"]
+
+def get_operation_list():
+    return [ 'avg', 'max' ]     # possibilities ['avg', 'min', 'max', 'first', 'last', 'stdev']
+
+def get_threshold_dict():
+    threshold_dict = {
+        'avg_value'    : (20, 30),
+        'min_value'    : (00, 10), 
+        'max_value'    : (45, 50),
+        'first_value'  : (00, 10),
+        'last_value'   : (40, 50),
+        'stdev_value'  : (00, 10),
+    }
+    # Filter threshold_dict based on the operation_list
+    return {
+        op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict
+    }
diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py
new file mode 100644
index 0000000000000000000000000000000000000000..2f40faba94ef7081db609116e8fd869e3d119a24
--- /dev/null
+++ b/src/analytics/backend/tests/test_backend.py
@@ -0,0 +1,64 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import logging
+import threading
+from common.tools.kafka.Variables import KafkaTopic
+from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
+from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict
+
+LOGGER = logging.getLogger(__name__)
+
+
+###########################
+# Tests Implementation of Telemetry Backend
+###########################
+
+# --- "test_validate_kafka_topics" should be run before the functionality tests ---
+def test_validate_kafka_topics():
+    LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
+    response = KafkaTopic.create_all_topics()
+    assert isinstance(response, bool)
+
+# def test_StartRequestListener():
+#     LOGGER.info('test_RunRequestListener')
+#     AnalyticsBackendServiceObj = AnalyticsBackendService()
+#     response = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
+#     LOGGER.debug(str(response)) 
+#     assert isinstance(response, tuple)
+
+# To test START and STOP communication together
+def test_StopRequestListener():
+    LOGGER.info('test_RunRequestListener')
+    LOGGER.info('Initiating StartRequestListener...')
+    AnalyticsBackendServiceObj = AnalyticsBackendService()
+    response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
+    # LOGGER.debug(str(response_thread))
+    time.sleep(10)
+    LOGGER.info('Initiating StopRequestListener...')
+    AnalyticsBackendServiceObj = AnalyticsBackendService()
+    response = AnalyticsBackendServiceObj.StopRequestListener(response_thread)
+    LOGGER.debug(str(response)) 
+    assert isinstance(response, bool)
+
+# To independently tests the SparkListener functionality
+# def test_SparkListener():
+#     LOGGER.info('test_RunRequestListener')
+#     AnalyticsBackendServiceObj = AnalyticsBackendService()
+#     response = AnalyticsBackendServiceObj.RunSparkStreamer(
+#         get_kpi_id_list(), get_operation_list(), get_threshold_dict()
+#         )
+#     LOGGER.debug(str(response))
+#     assert isinstance(response, bool)
diff --git a/src/analytics/database/AnalyzerEngine.py b/src/analytics/database/AnalyzerEngine.py
new file mode 100644
index 0000000000000000000000000000000000000000..9294e09966ef9e13c9cfa3cab590e5d0c8b6a80e
--- /dev/null
+++ b/src/analytics/database/AnalyzerEngine.py
@@ -0,0 +1,40 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging, sqlalchemy
+from common.Settings import get_setting
+
+LOGGER = logging.getLogger(__name__)
+CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}'
+
+class AnalyzerEngine:
+    @staticmethod
+    def get_engine() -> sqlalchemy.engine.Engine:
+        crdb_uri = get_setting('CRDB_URI', default=None)
+        if crdb_uri is None:
+            CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE')
+            CRDB_SQL_PORT  = get_setting('CRDB_SQL_PORT')
+            CRDB_DATABASE  = "tfs-analyzer"             # TODO: define variable get_setting('CRDB_DATABASE_KPI_MGMT')
+            CRDB_USERNAME  = get_setting('CRDB_USERNAME')
+            CRDB_PASSWORD  = get_setting('CRDB_PASSWORD')
+            CRDB_SSLMODE   = get_setting('CRDB_SSLMODE')
+            crdb_uri = CRDB_URI_TEMPLATE.format(
+                CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
+        try:
+            engine = sqlalchemy.create_engine(crdb_uri, echo=False)
+            LOGGER.info(' AnalyzerDB initalized with DB URL: {:}'.format(crdb_uri))
+        except: # pylint: disable=bare-except # pragma: no cover
+            LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri)))
+            return None # type: ignore
+        return engine
diff --git a/src/analytics/database/AnalyzerModel.py b/src/analytics/database/AnalyzerModel.py
new file mode 100644
index 0000000000000000000000000000000000000000..c33e396e06a8dce96a86951a64aa59b510931dfe
--- /dev/null
+++ b/src/analytics/database/AnalyzerModel.py
@@ -0,0 +1,106 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import enum
+
+from sqlalchemy     import Column, String, Float, Enum, BigInteger, JSON
+from sqlalchemy.orm import registry
+from common.proto   import analytics_frontend_pb2
+from common.proto   import kpi_manager_pb2
+
+from sqlalchemy.dialects.postgresql import UUID, ARRAY
+
+
+logging.basicConfig(level=logging.INFO)
+LOGGER = logging.getLogger(__name__)
+
+# Create a base class for declarative models
+Base = registry().generate_base()
+
+class AnalyzerOperationMode (enum.Enum):
+    BATCH     = analytics_frontend_pb2.AnalyzerOperationMode.ANALYZEROPERATIONMODE_BATCH
+    STREAMING = analytics_frontend_pb2.AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
+
+class Analyzer(Base):
+    __tablename__ = 'analyzer'
+
+    analyzer_id           = Column( UUID(as_uuid=False)        , primary_key=True)
+    algorithm_name        = Column( String                     , nullable=False  )
+    input_kpi_ids         = Column( ARRAY(UUID(as_uuid=False)) , nullable=False  )
+    output_kpi_ids        = Column( ARRAY(UUID(as_uuid=False)) , nullable=False  )
+    operation_mode        = Column( Enum(AnalyzerOperationMode), nullable=False  )
+    parameters            = Column( JSON                       , nullable=True   )
+    batch_min_duration_s  = Column( Float                      , nullable=False  )
+    batch_max_duration_s  = Column( Float                      , nullable=False  )
+    batch_min_size        = Column( BigInteger                 , nullable=False  )
+    batch_max_size        = Column( BigInteger                 , nullable=False  )
+
+    # helps in logging the information
+    def __repr__(self):
+            return (f"<Analyzer(analyzer_id='{self.analyzer_id}'       , algorithm_name='{self.algorithm_name}', "
+                    f"input_kpi_ids={self.input_kpi_ids}               , output_kpi_ids={self.output_kpi_ids}, "
+                    f"operation_mode='{self.operation_mode}'           , parameters={self.parameters}, "
+                    f"batch_min_duration_s={self.batch_min_duration_s} , batch_max_duration_s={self.batch_max_duration_s}, "
+                    f"batch_min_size={self.batch_min_size}             , batch_max_size={self.batch_max_size})>")
+
+
+    @classmethod
+    def ConvertAnalyzerToRow(cls, request):
+        """
+        Create an instance of Analyzer table rows from a request object.
+        Args:    request: The request object containing analyzer gRPC message.
+        Returns: A row (an instance of Analyzer table) initialized with content of the request.
+        """
+        return cls(
+            analyzer_id          = request.analyzer_id.analyzer_id.uuid,
+            algorithm_name       = request.algorithm_name,
+            input_kpi_ids        = [k.kpi_id.uuid for k in request.input_kpi_ids],
+            output_kpi_ids       = [k.kpi_id.uuid for k in request.output_kpi_ids],
+            operation_mode       = AnalyzerOperationMode(request.operation_mode),   # converts integer to coresponding Enum class member
+            parameters           = dict(request.parameters),
+            batch_min_duration_s = request.batch_min_duration_s,
+            batch_max_duration_s = request.batch_max_duration_s,
+            batch_min_size       = request.batch_min_size,
+            batch_max_size       = request.batch_max_size
+        )
+
+    @classmethod
+    def ConvertRowToAnalyzer(cls, row):
+        """
+        Create and return an Analyzer gRPC message initialized with the content of a row.
+        Args: row: The Analyzer table instance (row) containing the data.
+        Returns:   An Analyzer gRPC message initialized with the content of the row.
+        """
+        # Create an instance of the Analyzer message
+        response                              = analytics_frontend_pb2.Analyzer()
+        response.analyzer_id.analyzer_id.uuid = row.analyzer_id
+        response.algorithm_name               = row.algorithm_name
+        response.operation_mode               = row.operation_mode.value
+        response.parameters.update(row.parameters)
+        
+        for input_kpi_id in row.input_kpi_ids:
+            _kpi_id = kpi_manager_pb2.KpiId()
+            _kpi_id.kpi_id.uuid = input_kpi_id
+            response.input_kpi_ids.append(_kpi_id)
+        for output_kpi_id in row.output_kpi_ids:
+            _kpi_id = kpi_manager_pb2.KpiId()
+            _kpi_id.kpi_id.uuid = output_kpi_id
+            response.output_kpi_ids.append(_kpi_id)
+
+        response.batch_min_duration_s = row.batch_min_duration_s
+        response.batch_max_duration_s = row.batch_max_duration_s
+        response.batch_min_size       = row.batch_min_size
+        response.batch_max_size       = row.batch_max_size
+        return response
diff --git a/src/analytics/database/Analyzer_DB.py b/src/analytics/database/Analyzer_DB.py
new file mode 100644
index 0000000000000000000000000000000000000000..1ba68989a066e4638adc12e65289ed50b740731d
--- /dev/null
+++ b/src/analytics/database/Analyzer_DB.py
@@ -0,0 +1,150 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import sqlalchemy_utils
+
+from sqlalchemy     import inspect, or_
+from sqlalchemy.orm import sessionmaker
+
+from analytics.database.AnalyzerModel         import Analyzer as AnalyzerModel
+from analytics.database.AnalyzerEngine        import AnalyzerEngine
+from common.method_wrappers.ServiceExceptions import (OperationFailedException, AlreadyExistsException)
+
+LOGGER = logging.getLogger(__name__)
+DB_NAME = "tfs_analyzer"        # TODO: export name from enviornment variable
+
+class AnalyzerDB:
+    def __init__(self):
+        self.db_engine = AnalyzerEngine.get_engine()
+        if self.db_engine is None:
+            LOGGER.error('Unable to get SQLAlchemy DB Engine...')
+            return False
+        self.db_name = DB_NAME
+        self.Session = sessionmaker(bind=self.db_engine)
+
+    def create_database(self):
+        if not sqlalchemy_utils.database_exists(self.db_engine.url):
+            LOGGER.debug("Database created. {:}".format(self.db_engine.url))
+            sqlalchemy_utils.create_database(self.db_engine.url)
+
+    def drop_database(self) -> None:
+        if sqlalchemy_utils.database_exists(self.db_engine.url):
+            sqlalchemy_utils.drop_database(self.db_engine.url)
+
+    def create_tables(self):
+        try:
+            AnalyzerModel.metadata.create_all(self.db_engine)     # type: ignore
+            LOGGER.debug("Tables created in the database: {:}".format(self.db_name))
+        except Exception as e:
+            LOGGER.debug("Tables cannot be created in the database. {:s}".format(str(e)))
+            raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)])
+
+    def verify_tables(self):
+        try:
+            inspect_object = inspect(self.db_engine)
+            if(inspect_object.has_table('analyzer', None)):
+                LOGGER.info("Table exists in DB: {:}".format(self.db_name))
+        except Exception as e:
+            LOGGER.info("Unable to fetch Table names. {:s}".format(str(e)))
+
+# ----------------- CURD OPERATIONS ---------------------
+
+    def add_row_to_db(self, row):
+        session = self.Session()
+        try:
+            session.add(row)
+            session.commit()
+            LOGGER.debug(f"Row inserted into {row.__class__.__name__} table.")
+            return True
+        except Exception as e:
+            session.rollback()
+            if "psycopg2.errors.UniqueViolation" in str(e):
+                LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}")
+                raise AlreadyExistsException(row.__class__.__name__, row,
+                                             extra_details=["Unique key voilation: {:}".format(e)] )
+            else:
+                LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}")
+                raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)])
+        finally:
+            session.close()
+    
+    def search_db_row_by_id(self, model, col_name, id_to_search):
+        session = self.Session()
+        try:
+            entity = session.query(model).filter_by(**{col_name: id_to_search}).first()
+            if entity:
+                # LOGGER.debug(f"{model.__name__} ID found: {str(entity)}")
+                return entity
+            else:
+                LOGGER.debug(f"{model.__name__} ID not found, No matching row: {str(id_to_search)}")
+                print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search))
+                return None
+        except Exception as e:
+            session.rollback()
+            LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}")
+            raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)])
+        finally:
+            session.close()
+    
+    def delete_db_row_by_id(self, model, col_name, id_to_search):
+        session = self.Session()
+        try:
+            record = session.query(model).filter_by(**{col_name: id_to_search}).first()
+            if record:
+                session.delete(record)
+                session.commit()
+                LOGGER.debug("Deleted %s with %s: %s", model.__name__, col_name, id_to_search)
+            else:
+                LOGGER.debug("%s with %s %s not found", model.__name__, col_name, id_to_search)
+                return None
+        except Exception as e:
+            session.rollback()
+            LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e)
+            raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)])
+        finally:
+            session.close()
+    
+    def select_with_filter(self, model, filter_object):
+        session = self.Session()
+        try:
+            query = session.query(AnalyzerModel)
+            
+            # Apply filters based on the filter_object
+            if filter_object.analyzer_id:
+                query = query.filter(AnalyzerModel.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id]))
+
+            if filter_object.algorithm_names:
+                query = query.filter(AnalyzerModel.algorithm_name.in_(filter_object.algorithm_names))
+
+            if filter_object.input_kpi_ids:
+                input_kpi_uuids = [k.kpi_id.uuid for k in filter_object.input_kpi_ids]
+                query = query.filter(AnalyzerModel.input_kpi_ids.op('&&')(input_kpi_uuids))
+
+            if filter_object.output_kpi_ids:
+                output_kpi_uuids = [k.kpi_id.uuid for k in filter_object.output_kpi_ids]
+                query = query.filter(AnalyzerModel.output_kpi_ids.op('&&')(output_kpi_uuids))
+
+            result = query.all()
+            # query should be added to return all rows
+            if result:
+                LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") #  - Results: {result}
+            else:
+                LOGGER.warning(f"No matching row found in {model.__name__} table with filters: {filter_object}")
+            return result
+        except Exception as e:
+            LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}")
+            raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)])
+        finally:
+            session.close()
diff --git a/src/analytics/database/__init__.py b/src/analytics/database/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..3ee6f7071f145e06c3aeaefc09a43ccd88e619e3
--- /dev/null
+++ b/src/analytics/database/__init__.py
@@ -0,0 +1,14 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
diff --git a/src/analytics/frontend/Dockerfile b/src/analytics/frontend/Dockerfile
new file mode 100644
index 0000000000000000000000000000000000000000..10499713f318a23e1aeab49c96e8163a5ec147fa
--- /dev/null
+++ b/src/analytics/frontend/Dockerfile
@@ -0,0 +1,70 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM python:3.9-slim
+
+# Install dependencies
+RUN apt-get --yes --quiet --quiet update && \
+    apt-get --yes --quiet --quiet install wget g++ git && \
+    rm -rf /var/lib/apt/lists/*
+
+# Set Python to show logs as they occur
+ENV PYTHONUNBUFFERED=0
+
+# Download the gRPC health probe
+RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
+    wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
+    chmod +x /bin/grpc_health_probe
+
+# Get generic Python packages
+RUN python3 -m pip install --upgrade pip
+RUN python3 -m pip install --upgrade setuptools wheel
+RUN python3 -m pip install --upgrade pip-tools
+
+# Get common Python packages
+# Note: this step enables sharing the previous Docker build steps among all the Python components
+WORKDIR /var/teraflow
+COPY common_requirements.in common_requirements.in
+RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in
+RUN python3 -m pip install -r common_requirements.txt
+
+# Add common files into working directory
+WORKDIR /var/teraflow/common
+COPY src/common/. ./
+RUN rm -rf proto
+
+# Create proto sub-folder, copy .proto files, and generate Python code
+RUN mkdir -p /var/teraflow/common/proto
+WORKDIR /var/teraflow/common/proto
+RUN touch __init__.py
+COPY proto/*.proto ./
+RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
+RUN rm *.proto
+RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
+
+# Create component sub-folders, get specific Python packages
+RUN mkdir -p /var/teraflow/analytics/frontend
+WORKDIR /var/teraflow/analytics/frontend
+COPY src/analytics/frontend/requirements.in requirements.in
+RUN pip-compile --quiet --output-file=requirements.txt requirements.in
+RUN python3 -m pip install -r requirements.txt
+
+# Add component files into working directory
+WORKDIR /var/teraflow
+COPY src/analytics/__init__.py analytics/__init__.py
+COPY src/analytics/frontend/. analytics/frontend/
+COPY src/analytics/database/. analytics/database/
+
+# Start the service
+ENTRYPOINT ["python", "-m", "analytics.frontend.service"]
diff --git a/src/analytics/frontend/__init__.py b/src/analytics/frontend/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..3ee6f7071f145e06c3aeaefc09a43ccd88e619e3
--- /dev/null
+++ b/src/analytics/frontend/__init__.py
@@ -0,0 +1,14 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
diff --git a/src/analytics/frontend/client/AnalyticsFrontendClient.py b/src/analytics/frontend/client/AnalyticsFrontendClient.py
new file mode 100644
index 0000000000000000000000000000000000000000..90e95d661d46f24ae5ffaeb7bcfa19b7e1f36526
--- /dev/null
+++ b/src/analytics/frontend/client/AnalyticsFrontendClient.py
@@ -0,0 +1,68 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import grpc, logging
+from common.Constants                         import ServiceNameEnum
+from common.proto.context_pb2                 import Empty
+from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceStub
+from common.proto.analytics_frontend_pb2      import AnalyzerId, Analyzer, AnalyzerFilter, AnalyzerList
+from common.Settings                          import get_service_host, get_service_port_grpc
+from common.tools.grpc.Tools                  import grpc_message_to_json_string
+from common.tools.client.RetryDecorator       import retry, delay_exponential
+
+LOGGER = logging.getLogger(__name__)
+MAX_RETRIES = 10
+DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
+RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
+
+class AnalyticsFrontendClient:
+    def __init__(self, host=None, port=None):
+        if not host: host = get_service_host(ServiceNameEnum.ANALYTICSFRONTEND)
+        if not port: port = get_service_port_grpc(ServiceNameEnum.ANALYTICSFRONTEND)
+        self.endpoint     = '{:s}:{:s}'.format(str(host), str(port))
+        LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint)))
+        self.channel      = None
+        self.stub         = None
+        self.connect()
+        LOGGER.debug('Channel created')
+
+    def connect(self):
+        self.channel      = grpc.insecure_channel(self.endpoint)
+        self.stub         = AnalyticsFrontendServiceStub(self.channel)
+
+    def close(self):
+        if self.channel is not None: self.channel.close()
+        self.channel      = None
+        self.stub         = None
+
+    @RETRY_DECORATOR
+    def StartAnalyzer (self, request: Analyzer) -> AnalyzerId: #type: ignore
+        LOGGER.debug('StartAnalyzer: {:s}'.format(grpc_message_to_json_string(request)))
+        response = self.stub.StartAnalyzer(request)
+        LOGGER.debug('StartAnalyzer result: {:s}'.format(grpc_message_to_json_string(response)))
+        return response
+
+    @RETRY_DECORATOR
+    def StopAnalyzer(self, request : AnalyzerId) -> Empty: # type: ignore
+        LOGGER.debug('StopAnalyzer: {:s}'.format(grpc_message_to_json_string(request)))
+        response = self.stub.StopAnalyzer(request)
+        LOGGER.debug('StopAnalyzer result: {:s}'.format(grpc_message_to_json_string(response)))
+        return response
+    
+    @RETRY_DECORATOR
+    def SelectAnalyzers(self, request : AnalyzerFilter) -> AnalyzerList: # type: ignore
+        LOGGER.debug('SelectAnalyzers: {:s}'.format(grpc_message_to_json_string(request)))
+        response = self.stub.SelectAnalyzers(request)
+        LOGGER.debug('SelectAnalyzers result: {:s}'.format(grpc_message_to_json_string(response)))
+        return response
diff --git a/src/analytics/frontend/client/__init__.py b/src/analytics/frontend/client/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..3ee6f7071f145e06c3aeaefc09a43ccd88e619e3
--- /dev/null
+++ b/src/analytics/frontend/client/__init__.py
@@ -0,0 +1,14 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
diff --git a/src/analytics/frontend/requirements.in b/src/analytics/frontend/requirements.in
new file mode 100644
index 0000000000000000000000000000000000000000..d81b9ddbeafeff94c830d48ca5594e775b9ce240
--- /dev/null
+++ b/src/analytics/frontend/requirements.in
@@ -0,0 +1,20 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+apscheduler==3.10.4
+confluent-kafka==2.3.*
+psycopg2-binary==2.9.*
+SQLAlchemy==1.4.*
+sqlalchemy-cockroachdb==1.4.*
+SQLAlchemy-Utils==0.38.*
diff --git a/src/analytics/frontend/service/AnalyticsFrontendService.py b/src/analytics/frontend/service/AnalyticsFrontendService.py
new file mode 100644
index 0000000000000000000000000000000000000000..42a7fc9b60418c1c0fc5af6f320ae5c330ce8871
--- /dev/null
+++ b/src/analytics/frontend/service/AnalyticsFrontendService.py
@@ -0,0 +1,28 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from common.Constants import ServiceNameEnum
+from common.Settings import get_service_port_grpc
+from common.tools.service.GenericGrpcService import GenericGrpcService
+from common.proto.analytics_frontend_pb2_grpc import add_AnalyticsFrontendServiceServicer_to_server
+from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl
+
+class AnalyticsFrontendService(GenericGrpcService):
+    def __init__(self, cls_name: str = __name__):
+        port = get_service_port_grpc(ServiceNameEnum.ANALYTICSFRONTEND)
+        super().__init__(port, cls_name=cls_name)
+        self.analytics_frontend_servicer = AnalyticsFrontendServiceServicerImpl()
+    
+    def install_servicers(self):
+        add_AnalyticsFrontendServiceServicer_to_server(self.analytics_frontend_servicer, self.server)
diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py
new file mode 100644
index 0000000000000000000000000000000000000000..8bb6a17afb5b911e3652fdb8d1853b5b7bc6faf3
--- /dev/null
+++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py
@@ -0,0 +1,214 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging, grpc, json, queue
+
+from typing          import Dict
+from confluent_kafka import Consumer as KafkaConsumer
+from confluent_kafka import Producer as KafkaProducer
+from confluent_kafka import KafkaError
+
+from common.tools.kafka.Variables             import KafkaConfig, KafkaTopic
+from common.proto.context_pb2                 import Empty
+from common.method_wrappers.Decorator         import MetricsPool, safe_and_metered_rpc_method
+from common.proto.analytics_frontend_pb2      import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList
+from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer
+from analytics.database.Analyzer_DB           import AnalyzerDB
+from analytics.database.AnalyzerModel         import Analyzer as AnalyzerModel
+from apscheduler.schedulers.background        import BackgroundScheduler
+from apscheduler.triggers.interval            import IntervalTrigger
+
+LOGGER           = logging.getLogger(__name__)
+METRICS_POOL     = MetricsPool('AnalyticsFrontend', 'NBIgRPC')
+
+class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
+    def __init__(self):
+        LOGGER.info('Init AnalyticsFrontendService')
+        self.listener_topic = KafkaTopic.ANALYTICS_RESPONSE.value
+        self.db_obj         = AnalyzerDB()
+        self.result_queue   = queue.Queue()
+        self.scheduler      = BackgroundScheduler()
+        self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
+        self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
+                                            'group.id'           : 'analytics-frontend',
+                                            'auto.offset.reset'  : 'latest'})
+
+    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
+    def StartAnalyzer(self, 
+                       request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore
+                      ) -> AnalyzerId: # type: ignore
+        LOGGER.info ("At Service gRPC message: {:}".format(request))
+        response = AnalyzerId()
+
+        self.db_obj.add_row_to_db(
+            AnalyzerModel.ConvertAnalyzerToRow(request)
+        )
+        self.PublishStartRequestOnKafka(request)
+        
+        response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid
+        return response
+
+    def PublishStartRequestOnKafka(self, analyzer_obj):
+        """
+        Method to generate analyzer request on Kafka.
+        """
+        analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
+        analyzer_to_generate : Dict = {
+            "algo_name"       : analyzer_obj.algorithm_name,
+            "input_kpis"      : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
+            "output_kpis"     : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
+            "oper_mode"       : analyzer_obj.operation_mode,
+            "thresholds"      : json.loads(analyzer_obj.parameters["thresholds"]),
+            "window_size"     : analyzer_obj.parameters["window_size"],
+            "window_slider"   : analyzer_obj.parameters["window_slider"],
+            # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] 
+        }
+        self.kafka_producer.produce(
+            KafkaTopic.ANALYTICS_REQUEST.value,
+            key      = analyzer_uuid,
+            value    = json.dumps(analyzer_to_generate),
+            callback = self.delivery_callback
+        )
+        LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate))
+        self.kafka_producer.flush()
+        
+        # self.StartResponseListener(analyzer_uuid)
+
+    def StartResponseListener(self, filter_key=None):
+        """
+        Start the Kafka response listener with APScheduler and return key-value pairs periodically.
+        """
+        LOGGER.info("Starting StartResponseListener")
+        # Schedule the ResponseListener at fixed intervals
+        self.scheduler.add_job(
+            self.response_listener,
+            trigger=IntervalTrigger(seconds=5),
+            args=[filter_key], 
+            id=f"response_listener_{self.listener_topic}",
+            replace_existing=True
+        )
+        self.scheduler.start()
+        LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...")
+        try:
+            while True:
+                LOGGER.info("entering while...")
+                key, value = self.result_queue.get()  # Wait until a result is available
+                LOGGER.info("In while true ...")
+                yield key, value  # Yield the result to the calling function
+        except KeyboardInterrupt:
+            LOGGER.warning("Listener stopped manually.")
+        finally:
+            self.StopListener()
+
+    def response_listener(self, filter_key=None):
+        """
+        Poll Kafka messages and put key-value pairs into the queue.
+        """
+        LOGGER.info(f"Polling Kafka topic {self.listener_topic}...")
+
+        consumer = self.kafka_consumer
+        consumer.subscribe([self.listener_topic])
+        msg = consumer.poll(2.0)
+        if msg is None:
+            return
+        elif msg.error():
+            if msg.error().code() != KafkaError._PARTITION_EOF:
+                LOGGER.error(f"Kafka error: {msg.error()}")
+            return
+
+        try:
+            key = msg.key().decode('utf-8') if msg.key() else None
+            if filter_key is not None and key == filter_key:
+                value = json.loads(msg.value().decode('utf-8'))
+                LOGGER.info(f"Received key: {key}, value: {value}")
+                self.result_queue.put((key, value))
+            else:
+                LOGGER.info(f"Skipping message with unmatched key: {key}")
+                # value = json.loads(msg.value().decode('utf-8')) # Added for debugging
+                # self.result_queue.put((filter_key, value))             # Added for debugging
+        except Exception as e:
+            LOGGER.error(f"Error processing Kafka message: {e}")
+
+    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
+    def StopAnalyzer(self, 
+                      request : AnalyzerId, grpc_context: grpc.ServicerContext # type: ignore
+                     ) -> Empty:  # type: ignore
+        LOGGER.info ("At Service gRPC message: {:}".format(request))
+        try:
+            analyzer_id_to_delete = request.analyzer_id.uuid
+            self.db_obj.delete_db_row_by_id(
+                AnalyzerModel, "analyzer_id", analyzer_id_to_delete
+            )
+            self.PublishStopRequestOnKafka(analyzer_id_to_delete)
+        except Exception as e:
+            LOGGER.error('Unable to delete analyzer. Error: {:}'.format(e))
+        return Empty()
+
+    def PublishStopRequestOnKafka(self, analyzer_uuid):
+        """
+        Method to generate stop analyzer request on Kafka.
+        """
+        # analyzer_uuid = analyzer_id.analyzer_id.uuid
+        analyzer_to_stop :  Dict = {
+            "algo_name"   : None,
+            "input_kpis"  : [],
+            "output_kpis" : [],
+            "oper_mode"   : None
+        }
+        self.kafka_producer.produce(
+            KafkaTopic.ANALYTICS_REQUEST.value,
+            key      = analyzer_uuid,
+            value    = json.dumps(analyzer_to_stop),
+            callback = self.delivery_callback
+        )
+        LOGGER.info("Analyzer Stop Request Generated: Analyzer Id: {:}".format(analyzer_uuid))
+        self.kafka_producer.flush()
+        self.StopListener()
+
+    def StopListener(self):
+        """
+        Gracefully stop the Kafka listener and the scheduler.
+        """
+        LOGGER.info("Stopping Kafka listener...")
+        self.scheduler.shutdown()
+        LOGGER.info("Kafka listener stopped.")
+
+    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
+    def SelectAnalyzers(self, 
+                         filter : AnalyzerFilter, contextgrpc_context: grpc.ServicerContext # type: ignore
+                        ) -> AnalyzerList:  # type: ignore
+        LOGGER.info("At Service gRPC message: {:}".format(filter))
+        response = AnalyzerList()
+        try:
+            rows = self.db_obj.select_with_filter(AnalyzerModel, filter)
+            try:
+                for row in rows:
+                    response.analyzer_list.append(
+                        AnalyzerModel.ConvertRowToAnalyzer(row)
+                    )
+                return response
+            except Exception as e:
+                LOGGER.info('Unable to process filter response {:}'.format(e))
+        except Exception as e:
+            LOGGER.error('Unable to apply filter on table {:}. ERROR: {:}'.format(AnalyzerModel.__name__, e))
+       
+
+    def delivery_callback(self, err, msg):
+        if err:
+            LOGGER.debug('Message delivery failed: {:}'.format(err))
+            print       ('Message delivery failed: {:}'.format(err))
+        # else:
+        #     LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
+        #     print('Message delivered to topic {:}'.format(msg.topic()))
diff --git a/src/analytics/frontend/service/__init__.py b/src/analytics/frontend/service/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..3ee6f7071f145e06c3aeaefc09a43ccd88e619e3
--- /dev/null
+++ b/src/analytics/frontend/service/__init__.py
@@ -0,0 +1,14 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
diff --git a/src/analytics/frontend/service/__main__.py b/src/analytics/frontend/service/__main__.py
new file mode 100644
index 0000000000000000000000000000000000000000..6c331844f45d98095ef98951f3db43a0e2f0c69c
--- /dev/null
+++ b/src/analytics/frontend/service/__main__.py
@@ -0,0 +1,56 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging, signal, sys, threading
+from prometheus_client import start_http_server
+from common.Settings import get_log_level, get_metrics_port
+from .AnalyticsFrontendService import AnalyticsFrontendService
+
+terminate = threading.Event()
+LOGGER = None
+
+def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
+    LOGGER.warning('Terminate signal received')
+    terminate.set()
+
+def main():
+    global LOGGER # pylint: disable=global-statement
+
+    log_level = get_log_level()
+    logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
+    LOGGER = logging.getLogger(__name__)
+
+    signal.signal(signal.SIGINT,  signal_handler)
+    signal.signal(signal.SIGTERM, signal_handler)
+
+    LOGGER.info('Starting...')
+
+    # Start metrics server
+    metrics_port = get_metrics_port()
+    start_http_server(metrics_port)
+
+    grpc_service = AnalyticsFrontendService()
+    grpc_service.start()
+
+    # Wait for Ctrl+C or termination signal
+    while not terminate.wait(timeout=1.0): pass
+
+    LOGGER.info('Terminating...')
+    grpc_service.stop()
+
+    LOGGER.info('Bye')
+    return 0
+
+if __name__ == '__main__':
+    sys.exit(main())
diff --git a/src/analytics/frontend/tests/__init__.py b/src/analytics/frontend/tests/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..3ee6f7071f145e06c3aeaefc09a43ccd88e619e3
--- /dev/null
+++ b/src/analytics/frontend/tests/__init__.py
@@ -0,0 +1,14 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
diff --git a/src/analytics/frontend/tests/messages.py b/src/analytics/frontend/tests/messages.py
new file mode 100644
index 0000000000000000000000000000000000000000..646de962e8a213582fdb7cd1446ab57bda561a96
--- /dev/null
+++ b/src/analytics/frontend/tests/messages.py
@@ -0,0 +1,84 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import uuid
+import json
+from common.proto.kpi_manager_pb2        import KpiId
+from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, AnalyzerId,
+                                                Analyzer, AnalyzerFilter )
+
+def create_analyzer_id():
+    _create_analyzer_id                  = AnalyzerId()
+    # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
+    _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
+    return _create_analyzer_id
+
+def create_analyzer():
+    _create_analyzer                              = Analyzer()
+    # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
+    _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
+    _create_analyzer.algorithm_name               = "Test_Aggergate_and_Threshold"
+    _create_analyzer.operation_mode               = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
+    
+    _kpi_id = KpiId()
+    # input IDs to analyze
+    _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
+    _kpi_id.kpi_id.uuid              = "6e22f180-ba28-4641-b190-2287bf448888"
+    _create_analyzer.input_kpi_ids.append(_kpi_id)
+    _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
+    _kpi_id.kpi_id.uuid              = "1e22f180-ba28-4641-b190-2287bf446666"
+    _create_analyzer.input_kpi_ids.append(_kpi_id)
+    _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
+    _create_analyzer.input_kpi_ids.append(_kpi_id)
+    # output IDs after analysis
+    _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
+    _create_analyzer.output_kpi_ids.append(_kpi_id)
+    _kpi_id.kpi_id.uuid              = str(uuid.uuid4())
+    _create_analyzer.output_kpi_ids.append(_kpi_id)
+    # parameter
+    _threshold_dict = {
+        # 'avg_value'   :(20, 30), 'min_value'   :(00, 10), 'max_value'   :(45, 50),
+        'first_value' :(00, 10), 'last_value'  :(40, 50), 'stdev_value':(00, 10)}
+    _create_analyzer.parameters['thresholds']      = json.dumps(_threshold_dict)
+    _create_analyzer.parameters['window_size']     = "60 seconds"     # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" 
+    _create_analyzer.parameters['window_slider']   = "30 seconds"     # should be less than window size
+    _create_analyzer.parameters['store_aggregate'] = str(False)       # TRUE to store. No implemented yet
+
+    return _create_analyzer
+
+def create_analyzer_filter():
+    _create_analyzer_filter           = AnalyzerFilter()
+
+    _analyzer_id_obj                  = AnalyzerId()
+    # _analyzer_id_obj.analyzer_id.uuid = str(uuid.uuid4())
+    _analyzer_id_obj.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
+    _create_analyzer_filter.analyzer_id.append(_analyzer_id_obj)
+
+    _create_analyzer_filter.algorithm_names.append('Test_Aggergate_and_Threshold')
+
+    # _input_kpi_id_obj                 = KpiId()
+    # _input_kpi_id_obj.kpi_id.uuid     = str(uuid.uuid4())
+    # _create_analyzer_filter.input_kpi_ids.append(_input_kpi_id_obj)
+    # another input kpi Id
+    # _input_kpi_id_obj.kpi_id.uuid     = str(uuid.uuid4())
+    # _create_analyzer_filter.input_kpi_ids.append(_input_kpi_id_obj)
+
+    # _output_kpi_id_obj                = KpiId()
+    # _output_kpi_id_obj.kpi_id.uuid    = str(uuid.uuid4())
+    # _create_analyzer_filter.output_kpi_ids.append(_output_kpi_id_obj)
+    # # another output kpi Id
+    # _output_kpi_id_obj.kpi_id.uuid     = str(uuid.uuid4())
+    # _create_analyzer_filter.input_kpi_ids.append(_output_kpi_id_obj)
+
+    return _create_analyzer_filter
diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py
new file mode 100644
index 0000000000000000000000000000000000000000..d2428c01fb021f71a884d9a99c446bfef6e66559
--- /dev/null
+++ b/src/analytics/frontend/tests/test_frontend.py
@@ -0,0 +1,134 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import time
+import json
+import pytest
+import logging
+import threading
+
+from common.Constants         import ServiceNameEnum
+from common.proto.context_pb2 import Empty
+from common.Settings          import ( get_service_port_grpc, get_env_var_name, 
+                                      ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC )
+
+from common.tools.kafka.Variables                        import KafkaTopic
+from common.proto.analytics_frontend_pb2                 import AnalyzerId, AnalyzerList
+from analytics.frontend.client.AnalyticsFrontendClient   import AnalyticsFrontendClient
+from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService
+from analytics.frontend.tests.messages                   import ( create_analyzer_id, create_analyzer,
+                                                                 create_analyzer_filter )
+from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl
+from apscheduler.schedulers.background                   import BackgroundScheduler
+from apscheduler.triggers.interval                       import IntervalTrigger
+
+
+###########################
+# Tests Setup
+###########################
+
+LOCAL_HOST = '127.0.0.1'
+
+ANALYTICS_FRONTEND_PORT = str(get_service_port_grpc(ServiceNameEnum.ANALYTICSFRONTEND))
+os.environ[get_env_var_name(ServiceNameEnum.ANALYTICSFRONTEND, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
+os.environ[get_env_var_name(ServiceNameEnum.ANALYTICSFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(ANALYTICS_FRONTEND_PORT)
+
+LOGGER = logging.getLogger(__name__)
+
+@pytest.fixture(scope='session')
+def analyticsFrontend_service():
+    LOGGER.info('Initializing AnalyticsFrontendService...')
+
+    _service = AnalyticsFrontendService()
+    _service.start()
+
+    # yield the server, when test finishes, execution will resume to stop it
+    LOGGER.info('Yielding AnalyticsFrontendService...')
+    yield _service
+
+    LOGGER.info('Terminating AnalyticsFrontendService...')
+    _service.stop()
+
+    LOGGER.info('Terminated AnalyticsFrontendService...')
+
+@pytest.fixture(scope='session')
+def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendService):
+    LOGGER.info('Initializing AnalyticsFrontendClient...')
+
+    _client = AnalyticsFrontendClient()
+
+    # yield the server, when test finishes, execution will resume to stop it
+    LOGGER.info('Yielding AnalyticsFrontendClient...')
+    yield _client
+
+    LOGGER.info('Closing AnalyticsFrontendClient...')
+    _client.close()
+
+    LOGGER.info('Closed AnalyticsFrontendClient...')
+
+
+###########################
+# Tests Implementation of Analytics Frontend
+###########################
+
+# --- "test_validate_kafka_topics" should be executed before the functionality tests ---
+def test_validate_kafka_topics():
+    LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
+    response = KafkaTopic.create_all_topics()
+    assert isinstance(response, bool)
+
+# ----- core funtionality test -----
+# def test_StartAnalytics(analyticsFrontend_client):
+#     LOGGER.info(' >>> test_StartAnalytic START: <<< ')
+#     response = analyticsFrontend_client.StartAnalyzer(create_analyzer())
+#     LOGGER.debug(str(response))
+#     assert isinstance(response, AnalyzerId)
+
+# To test start and stop listener together
+def test_StartStopAnalyzers(analyticsFrontend_client):
+    LOGGER.info(' >>> test_StartStopAnalyzers START: <<< ')
+    LOGGER.info('--> StartAnalyzer')
+    added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer())
+    LOGGER.debug(str(added_analyzer_id))
+    LOGGER.info(' --> Calling StartResponseListener... ')
+    class_obj = AnalyticsFrontendServiceServicerImpl()
+    response =  class_obj.StartResponseListener(added_analyzer_id.analyzer_id._uuid)
+    LOGGER.debug(response)
+    LOGGER.info("waiting for timer to comlete ...")
+    time.sleep(3)
+    LOGGER.info('--> StopAnalyzer')
+    response = analyticsFrontend_client.StopAnalyzer(added_analyzer_id)
+    LOGGER.debug(str(response))
+
+# def test_SelectAnalytics(analyticsFrontend_client):
+#     LOGGER.info(' >>> test_SelectAnalytics START: <<< ')
+#     response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter())
+#     LOGGER.debug(str(response))
+#     assert isinstance(response, AnalyzerList)
+
+# def test_StopAnalytic(analyticsFrontend_client):
+#     LOGGER.info(' >>> test_StopAnalytic START: <<< ')
+#     response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id())
+#     LOGGER.debug(str(response))
+#     assert isinstance(response, Empty)
+
+# def test_ResponseListener():
+#         LOGGER.info(' >>> test_ResponseListener START <<< ')
+#         analyzer_id = create_analyzer_id()
+#         LOGGER.debug("Starting Response Listener for Analyzer ID: {:}".format(analyzer_id.analyzer_id.uuid))
+#         class_obj = AnalyticsFrontendServiceServicerImpl()
+#         for response in class_obj.StartResponseListener(analyzer_id.analyzer_id.uuid):
+#             LOGGER.debug(response)
+#             assert isinstance(response, tuple)
\ No newline at end of file
diff --git a/src/analytics/requirements.in b/src/analytics/requirements.in
new file mode 100644
index 0000000000000000000000000000000000000000..8ff30ddaad25c39713f2e6f68c8d9aebed74dad0
--- /dev/null
+++ b/src/analytics/requirements.in
@@ -0,0 +1,21 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+java==11.0.*
+pyspark==3.5.2
+confluent-kafka==2.3.*
+psycopg2-binary==2.9.*
+SQLAlchemy==1.4.*
+sqlalchemy-cockroachdb==1.4.*
+SQLAlchemy-Utils==0.38.*
diff --git a/src/analytics/tests/__init__.py b/src/analytics/tests/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..3ee6f7071f145e06c3aeaefc09a43ccd88e619e3
--- /dev/null
+++ b/src/analytics/tests/__init__.py
@@ -0,0 +1,14 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
diff --git a/src/analytics/tests/test_analytics_db.py b/src/analytics/tests/test_analytics_db.py
new file mode 100644
index 0000000000000000000000000000000000000000..58e7d0167044bb461e66b053dcb3999641ea8419
--- /dev/null
+++ b/src/analytics/tests/test_analytics_db.py
@@ -0,0 +1,28 @@
+# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging
+from analytics.database.Analyzer_DB import AnalyzerDB
+
+LOGGER = logging.getLogger(__name__)
+
+def test_verify_databases_and_tables():
+    LOGGER.info('>>> test_verify_databases_and_tables : START <<< ')
+    AnalyzerDBobj = AnalyzerDB()
+    # AnalyzerDBobj.drop_database()
+    # AnalyzerDBobj.verify_tables()
+    AnalyzerDBobj.create_database()
+    AnalyzerDBobj.create_tables()
+    AnalyzerDBobj.verify_tables()
diff --git a/src/common/Constants.py b/src/common/Constants.py
index 4b2bced95fca28abdfd729492acc1117cdf3e8d9..74490321f9c8ec016fa4b48b583e2217c61710ec 100644
--- a/src/common/Constants.py
+++ b/src/common/Constants.py
@@ -66,6 +66,8 @@ class ServiceNameEnum(Enum):
     KPIVALUEWRITER         = 'kpi-value-writer'
     TELEMETRYFRONTEND      = 'telemetry-frontend'
     TELEMETRYBACKEND       = 'telemetry-backend'
+    ANALYTICSFRONTEND      = 'analytics-frontend'
+    ANALYTICSBACKEND       = 'analytics-backend'
 
     # Used for test and debugging only
     DLT_GATEWAY    = 'dltgateway'
@@ -100,6 +102,8 @@ DEFAULT_SERVICE_GRPC_PORTS = {
     ServiceNameEnum.KPIVALUEWRITER         .value : 30030,
     ServiceNameEnum.TELEMETRYFRONTEND      .value : 30050,
     ServiceNameEnum.TELEMETRYBACKEND       .value : 30060,
+    ServiceNameEnum.ANALYTICSFRONTEND      .value : 30080,
+    ServiceNameEnum.ANALYTICSBACKEND       .value : 30090,
 
     # Used for test and debugging only
     ServiceNameEnum.DLT_GATEWAY   .value : 50051,
diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py
index 5ada88a1ea0a7eae31eda741d81757fa624521de..fc43c315114e7b51c4e2604afbb14e165796e7c5 100644
--- a/src/common/tools/kafka/Variables.py
+++ b/src/common/tools/kafka/Variables.py
@@ -14,7 +14,6 @@
 
 import logging
 from enum import Enum
-from confluent_kafka import KafkaException
 from confluent_kafka.admin import AdminClient, NewTopic
 from common.Settings import get_setting
 
@@ -26,11 +25,11 @@ class KafkaConfig(Enum):
 
     @staticmethod
     def get_kafka_address() -> str:
-        kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None)
-        if kafka_server_address is None:
-            KFK_NAMESPACE        = get_setting('KFK_NAMESPACE')
-            KFK_PORT             = get_setting('KFK_SERVER_PORT')
-            kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT)
+        # kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None)
+        # if kafka_server_address is None:
+        KFK_NAMESPACE        = get_setting('KFK_NAMESPACE')
+        KFK_PORT             = get_setting('KFK_SERVER_PORT')
+        kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT)
         return kafka_server_address
         
     @staticmethod
@@ -41,11 +40,14 @@ class KafkaConfig(Enum):
 
 
 class KafkaTopic(Enum):
-    REQUEST  = 'topic_request' 
-    RESPONSE = 'topic_response'
-    RAW      = 'topic_raw' 
-    LABELED  = 'topic_labeled'
-    VALUE    = 'topic_value'
+    # TODO: Later to be populated from ENV variable.
+    REQUEST            = 'topic_request' 
+    RESPONSE           = 'topic_response'
+    RAW                = 'topic_raw' 
+    LABELED            = 'topic_labeled'
+    VALUE              = 'topic_value'
+    ANALYTICS_REQUEST  = 'topic_request_analytics'
+    ANALYTICS_RESPONSE = 'topic_response_analytics'
 
     @staticmethod
     def create_all_topics() -> bool:
diff --git a/src/kpi_manager/database/KpiEngine.py b/src/kpi_manager/database/KpiEngine.py
index dff406de666b5f68539b8897fa26e0b3ad51286b..0fce7e3d36cf2f03a18f311c815719a4f17b2869 100644
--- a/src/kpi_manager/database/KpiEngine.py
+++ b/src/kpi_manager/database/KpiEngine.py
@@ -16,8 +16,6 @@ import logging, sqlalchemy
 from common.Settings import get_setting
 
 LOGGER = logging.getLogger(__name__)
-
-# CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@127.0.0.1:{:s}/{:s}?sslmode={:s}'
 CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}'
 
 class KpiEngine:
@@ -33,12 +31,10 @@ class KpiEngine:
             CRDB_SSLMODE   = get_setting('CRDB_SSLMODE')
             crdb_uri = CRDB_URI_TEMPLATE.format(
                 CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
-        # crdb_uri = CRDB_URI_TEMPLATE.format(
-        #         CRDB_USERNAME, CRDB_PASSWORD, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
         try:
             engine = sqlalchemy.create_engine(crdb_uri, echo=False)
             LOGGER.info(' KpiDBmanager initalized with DB URL: {:}'.format(crdb_uri))
         except: # pylint: disable=bare-except # pragma: no cover
             LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri)))
             return None # type: ignore
-        return engine 
+        return engine
diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py
index 95662969be4f9191e5f3748490a6bc47167e6243..6ab841238f446a2895cd163fab4b7eb05eaa3176 100755
--- a/src/telemetry/backend/service/TelemetryBackendService.py
+++ b/src/telemetry/backend/service/TelemetryBackendService.py
@@ -28,12 +28,8 @@ from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
 from common.method_wrappers.Decorator import MetricsPool
 from common.tools.service.GenericGrpcService import GenericGrpcService
 
-
-
 LOGGER             = logging.getLogger(__name__)
 METRICS_POOL       = MetricsPool('TelemetryBackend', 'backendService')
-# EXPORTER_ENDPOINT  = "http://10.152.183.2:9100/metrics"
-
 
 class TelemetryBackendService(GenericGrpcService):
     """
diff --git a/src/telemetry/backend/service/__main__.py b/src/telemetry/backend/service/__main__.py
index 4ad86733141966894070b78b3ac227890293fa7c..9ec9e191fd22e07da46f80214ade0ac516032433 100644
--- a/src/telemetry/backend/service/__main__.py
+++ b/src/telemetry/backend/service/__main__.py
@@ -13,7 +13,8 @@
 # limitations under the License.
 
 import logging, signal, sys, threading
-from common.Settings import get_log_level
+from prometheus_client import start_http_server
+from common.Settings import get_log_level, get_metrics_port
 from .TelemetryBackendService import TelemetryBackendService
 
 terminate = threading.Event()
@@ -27,13 +28,17 @@ def main():
     global LOGGER # pylint: disable=global-statement
 
     log_level = get_log_level()
-    logging.basicConfig(level=log_level)
+    logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
     LOGGER = logging.getLogger(__name__)
 
     signal.signal(signal.SIGINT,  signal_handler)
     signal.signal(signal.SIGTERM, signal_handler)
 
-    LOGGER.debug('Starting...')
+    LOGGER.info('Starting...')
+
+    # Start metrics server
+    metrics_port = get_metrics_port()
+    start_http_server(metrics_port)
 
     grpc_service = TelemetryBackendService()
     grpc_service.start()
@@ -41,10 +46,10 @@ def main():
     # Wait for Ctrl+C or termination signal
     while not terminate.wait(timeout=1.0): pass
 
-    LOGGER.debug('Terminating...')
+    LOGGER.info('Terminating...')
     grpc_service.stop()
 
-    LOGGER.debug('Bye')
+    LOGGER.info('Bye')
     return 0
 
 if __name__ == '__main__':
diff --git a/src/telemetry/database/TelemetryEngine.py b/src/telemetry/database/TelemetryEngine.py
index 18ec2ddbc671302b642db04b673038659da7acde..7c8620faf25e695e7f971bce78be9ad208a7701b 100644
--- a/src/telemetry/database/TelemetryEngine.py
+++ b/src/telemetry/database/TelemetryEngine.py
@@ -16,27 +16,25 @@ import logging, sqlalchemy
 from common.Settings import get_setting
 
 LOGGER = logging.getLogger(__name__)
-
-# CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@127.0.0.1:{:s}/{:s}?sslmode={:s}'
 CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}'
 
 class TelemetryEngine:
     @staticmethod
     def get_engine() -> sqlalchemy.engine.Engine:
         crdb_uri = get_setting('CRDB_URI', default=None)
-        if crdb_uri is None:        
-            CRDB_NAMESPACE = "crdb"
-            CRDB_SQL_PORT  = "26257"
-            CRDB_DATABASE  = "tfs-telemetry"
-            CRDB_USERNAME  = "tfs"
-            CRDB_PASSWORD  = "tfs123"
-            CRDB_SSLMODE   = "require"
+        if crdb_uri is None:
+            CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE')
+            CRDB_SQL_PORT  = get_setting('CRDB_SQL_PORT')
+            CRDB_DATABASE  = "tfs-telemetry"             # TODO: define variable get_setting('CRDB_DATABASE_KPI_MGMT')
+            CRDB_USERNAME  = get_setting('CRDB_USERNAME')
+            CRDB_PASSWORD  = get_setting('CRDB_PASSWORD')
+            CRDB_SSLMODE   = get_setting('CRDB_SSLMODE')
             crdb_uri = CRDB_URI_TEMPLATE.format(
-                    CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
+                CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
         try:
             engine = sqlalchemy.create_engine(crdb_uri, echo=False)
             LOGGER.info(' TelemetryDB initalized with DB URL: {:}'.format(crdb_uri))
         except: # pylint: disable=bare-except # pragma: no cover
             LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri)))
             return None # type: ignore
-        return engine # type: ignore
+        return engine
diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py
index 2b872dba33bbe1434b68d5b5d2449e0b228312f7..b73d9fa952ee42aeb7adb8f3c0b2e4a3ba7f3e09 100644
--- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py
+++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py
@@ -89,7 +89,14 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
                       request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore
                      ) -> Empty:  # type: ignore
         LOGGER.info ("gRPC message: {:}".format(request))
-        self.PublishStopRequestOnKafka(request)
+        try:
+            collector_to_delete = request.collector_id.uuid
+            self.tele_db_obj.delete_db_row_by_id(
+                CollectorModel, "collector_id", collector_to_delete
+            )
+            self.PublishStopRequestOnKafka(request)
+        except Exception as e:
+            LOGGER.error('Unable to delete collector. Error: {:}'.format(e))
         return Empty()
 
     def PublishStopRequestOnKafka(self, collector_id):