Skip to content
Snippets Groups Projects
Commit 68e713f6 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'feat/188-cttc-new-analytics-component' into 'develop'

(CTTC) New Analytics Component

See merge request !261
parents f3ea42c1 a94fe1fd
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!261(CTTC) New Analytics Component
Showing
with 740 additions and 21 deletions
......@@ -176,3 +176,6 @@ libyang/
# Other logs
**/logs/*.log.*
# PySpark checkpoints
src/analytics/.spark/*
......@@ -33,7 +33,7 @@ export TFS_COMPONENTS=${TFS_COMPONENTS:-"context device pathcomp service slice n
#export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring"
# Uncomment to activate Monitoring Framework (new)
#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api"
#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api telemetry analytics"
# Uncomment to activate BGP-LS Speaker
#export TFS_COMPONENTS="${TFS_COMPONENTS} bgpls_speaker"
......
......@@ -182,7 +182,19 @@ kubectl create secret generic crdb-telemetry --namespace ${TFS_K8S_NAMESPACE} --
--from-literal=CRDB_SSLMODE=require
printf "\n"
echo "Create secret with Apache Kafka data for KPI and Telemetry microservices"
echo "Create secret with CockroachDB data for Analytics microservices"
CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}')
CRDB_DATABASE_ANALYTICS="tfs_analytics" # TODO: change by specific configurable environment variable
kubectl create secret generic crdb-analytics --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \
--from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \
--from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \
--from-literal=CRDB_DATABASE=${CRDB_DATABASE_ANALYTICS} \
--from-literal=CRDB_USERNAME=${CRDB_USERNAME} \
--from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \
--from-literal=CRDB_SSLMODE=require
printf "\n"
echo "Create secret with Apache Kafka data for KPI, Telemetry and Analytics microservices"
KFK_SERVER_PORT=$(kubectl --namespace ${KFK_NAMESPACE} get service kafka-service -o 'jsonpath={.spec.ports[0].port}')
kubectl create secret generic kfk-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \
--from-literal=KFK_NAMESPACE=${KFK_NAMESPACE} \
......@@ -264,7 +276,7 @@ for COMPONENT in $TFS_COMPONENTS; do
if [ "$COMPONENT" == "ztp" ] || [ "$COMPONENT" == "policy" ]; then
$DOCKER_BUILD -t "$COMPONENT:$TFS_IMAGE_TAG" -f ./src/"$COMPONENT"/Dockerfile ./src/"$COMPONENT"/ > "$BUILD_LOG"
elif [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ]; then
elif [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ] || [ "$COMPONENT" == "analytics" ]; then
BUILD_LOG="$TMP_LOGS_FOLDER/build_${COMPONENT}-frontend.log"
$DOCKER_BUILD -t "$COMPONENT-frontend:$TFS_IMAGE_TAG" -f ./src/"$COMPONENT"/frontend/Dockerfile . > "$BUILD_LOG"
......@@ -287,7 +299,7 @@ for COMPONENT in $TFS_COMPONENTS; do
echo " Pushing Docker image to '$TFS_REGISTRY_IMAGES'..."
if [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ]; then
if [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ] || [ "$COMPONENT" == "analytics" ] ; then
IMAGE_URL=$(echo "$TFS_REGISTRY_IMAGES/$COMPONENT-frontend:$TFS_IMAGE_TAG" | sed 's,//,/,g' | sed 's,http:/,,g')
TAG_LOG="$TMP_LOGS_FOLDER/tag_${COMPONENT}-frontend.log"
......@@ -338,7 +350,7 @@ for COMPONENT in $TFS_COMPONENTS; do
cp ./manifests/"${COMPONENT}"service.yaml "$MANIFEST"
fi
if [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ]; then
if [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ] || [ "$COMPONENT" == "analytics" ]; then
IMAGE_URL=$(echo "$TFS_REGISTRY_IMAGES/$COMPONENT-frontend:$TFS_IMAGE_TAG" | sed 's,//,/,g' | sed 's,http:/,,g')
VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}-frontend:" "$MANIFEST" | cut -d ":" -f4)
sed -E -i "s#image: $GITLAB_REPO_URL/$COMPONENT-frontend:${VERSION}#image: $IMAGE_URL#g" "$MANIFEST"
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: apps/v1
kind: Deployment
metadata:
name: analyticsservice
spec:
selector:
matchLabels:
app: analyticsservice
#replicas: 1
template:
metadata:
labels:
app: analyticsservice
spec:
terminationGracePeriodSeconds: 5
containers:
- name: frontend
image: labs.etsi.org:5050/tfs/controller/analytics-frontend:latest
imagePullPolicy: Always
ports:
- containerPort: 30080
- containerPort: 9192
env:
- name: LOG_LEVEL
value: "INFO"
envFrom:
- secretRef:
name: crdb-analytics
- secretRef:
name: kfk-kpi-data
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:30080"]
livenessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:30080"]
resources:
requests:
cpu: 250m
memory: 128Mi
limits:
cpu: 1000m
memory: 1024Mi
- name: backend
image: labs.etsi.org:5050/tfs/controller/analytics-backend:latest
imagePullPolicy: Always
ports:
- containerPort: 30090
- containerPort: 9192
env:
- name: LOG_LEVEL
value: "INFO"
envFrom:
- secretRef:
name: kfk-kpi-data
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:30090"]
livenessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:30090"]
resources:
requests:
cpu: 250m
memory: 128Mi
limits:
cpu: 1000m
memory: 1024Mi
---
apiVersion: v1
kind: Service
metadata:
name: analyticsservice
labels:
app: analyticsservice
spec:
type: ClusterIP
selector:
app: analyticsservice
ports:
- name: frontend-grpc
protocol: TCP
port: 30080
targetPort: 30080
- name: backend-grpc
protocol: TCP
port: 30090
targetPort: 30090
- name: metrics
protocol: TCP
port: 9192
targetPort: 9192
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: analyticsservice-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: analyticsservice
minReplicas: 1
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 80
#behavior:
# scaleDown:
# stabilizationWindowSeconds: 30
......@@ -53,9 +53,9 @@ spec:
- name: KAFKA_LISTENERS
value: PLAINTEXT://:9092
- name: KAFKA_ADVERTISED_LISTENERS
value: PLAINTEXT://localhost:9092
value: PLAINTEXT://kafka-service.kafka.svc.cluster.local:9092
image: wurstmeister/kafka
imagePullPolicy: IfNotPresent
name: kafka-broker
ports:
- containerPort: 9092
\ No newline at end of file
- containerPort: 9092
......@@ -30,21 +30,25 @@ message AnalyzerId {
}
enum AnalyzerOperationMode {
ANALYZEROPERATIONMODE_BATCH = 0;
ANALYZEROPERATIONMODE_STREAMING = 1;
ANALYZEROPERATIONMODE_UNSPECIFIED = 0;
ANALYZEROPERATIONMODE_BATCH = 1;
ANALYZEROPERATIONMODE_STREAMING = 2;
}
// duration field may be added in analyzer...
message Analyzer {
string algorithm_name = 1; // The algorithm to be executed
repeated kpi_manager.KpiId input_kpi_ids = 2; // The KPI Ids to be processed by the analyzer
repeated kpi_manager.KpiId output_kpi_ids = 3; // The KPI Ids produced by the analyzer
AnalyzerOperationMode operation_mode = 4; // Operation mode of the analyzer
// In batch mode...
float batch_min_duration_s = 5; // ..., min duration to collect before executing batch
float batch_max_duration_s = 6; // ..., max duration collected to execute the batch
uint64 batch_min_size = 7; // ..., min number of samples to collect before executing batch
uint64 batch_max_size = 8; // ..., max number of samples collected to execute the batch
AnalyzerId analyzer_id = 1;
string algorithm_name = 2; // The algorithm to be executed
float duration_s = 3; // Termiate the data analytics thread after duration (seconds); 0 = infinity time
repeated kpi_manager.KpiId input_kpi_ids = 4; // The KPI Ids to be processed by the analyzer
repeated kpi_manager.KpiId output_kpi_ids = 5; // The KPI Ids produced by the analyzer
AnalyzerOperationMode operation_mode = 6; // Operation mode of the analyzer
map<string, string> parameters = 7; // Add dictionary of (key, value) pairs such as (window_size, 10) etc.
// In batch mode...
float batch_min_duration_s = 8; // ..., min duration to collect before executing batch
float batch_max_duration_s = 9; // ..., max duration collected to execute the batch
uint64 batch_min_size = 10; // ..., min number of samples to collect before executing batch
uint64 batch_max_size = 11; // ..., max number of samples collected to execute the batch
}
message AnalyzerFilter {
......
#!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
analytics/tests/test_analytics_db.py
#!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
analytics/frontend/tests/test_frontend.py
......@@ -24,5 +24,5 @@ cd $PROJECTDIR/src
# python3 kpi_manager/tests/test_unitary.py
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \
telemetry/backend/tests/testTelemetryBackend.py
python3 -m pytest --log-level=INFO --log-cli-level=debug --verbose \
telemetry/backend/tests/test_TelemetryBackend.py
#!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########################################################################################################################
# Define your deployment settings here
########################################################################################################################
# If not already set, set the name of the Kubernetes namespace to deploy to.
export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
########################################################################################################################
# Automated steps start here
########################################################################################################################
kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/analyticsservice -c backend
#!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########################################################################################################################
# Define your deployment settings here
########################################################################################################################
# If not already set, set the name of the Kubernetes namespace to deploy to.
export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
########################################################################################################################
# Automated steps start here
########################################################################################################################
kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/analyticsservice -c frontend
# How to locally run and test Analytic service (To be added soon)
### Pre-requisets
The following requirements should be fulfilled before the execuation of Telemetry service.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM python:3.9-slim
# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ git && \
rm -rf /var/lib/apt/lists/*
# Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0
# Download the gRPC health probe
RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
# Get generic Python packages
RUN python3 -m pip install --upgrade pip
RUN python3 -m pip install --upgrade setuptools wheel
RUN python3 -m pip install --upgrade pip-tools
# Get common Python packages
# Note: this step enables sharing the previous Docker build steps among all the Python components
WORKDIR /var/teraflow
COPY common_requirements.in common_requirements.in
RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in
RUN python3 -m pip install -r common_requirements.txt
# Add common files into working directory
WORKDIR /var/teraflow/common
COPY src/common/. ./
RUN rm -rf proto
# Create proto sub-folder, copy .proto files, and generate Python code
RUN mkdir -p /var/teraflow/common/proto
WORKDIR /var/teraflow/common/proto
RUN touch __init__.py
COPY proto/*.proto ./
RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
RUN rm *.proto
RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
# Create component sub-folders, get specific Python packages
RUN mkdir -p /var/teraflow/analytics/backend
WORKDIR /var/teraflow/analytics/backend
COPY src/analytics/backend/requirements.in requirements.in
RUN pip-compile --quiet --output-file=requirements.txt requirements.in
RUN python3 -m pip install -r requirements.txt
# Add component files into working directory
WORKDIR /var/teraflow
COPY src/analytics/__init__.py analytics/__init__.py
COPY src/analytics/backend/. analytics/backend/
# Start the service
ENTRYPOINT ["python", "-m", "analytics.backend.service"]
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
pyspark==3.5.2
confluent-kafka==2.3.*
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import threading
from common.tools.service.GenericGrpcService import GenericGrpcService
from analytics.backend.service.SparkStreaming import SparkStreamer
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaError
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
LOGGER = logging.getLogger(__name__)
class AnalyticsBackendService(GenericGrpcService):
"""
Class listens for ...
"""
def __init__(self, cls_name : str = __name__) -> None:
LOGGER.info('Init AnalyticsBackendService')
port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND)
super().__init__(port, cls_name=cls_name)
self.running_threads = {} # To keep track of all running analyzers
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'})
def StartSparkStreamer(self, analyzer_uuid, analyzer):
kpi_list = analyzer['input_kpis']
oper_list = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())] # TODO: update this line...
thresholds = analyzer['thresholds']
window_size = analyzer['window_size']
window_slider = analyzer['window_slider']
print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
kpi_list, oper_list, thresholds, window_size, window_slider))
LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
kpi_list, oper_list, thresholds, window_size, window_slider))
try:
stop_event = threading.Event()
thread = threading.Thread(target=SparkStreamer,
args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event,
window_size, window_slider, None ))
self.running_threads[analyzer_uuid] = (thread, stop_event)
thread.start()
print ("Initiated Analyzer backend: {:}".format(analyzer_uuid))
LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid))
return True
except Exception as e:
print ("Failed to initiate Analyzer backend: {:}".format(e))
LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
return False
def StopRequestListener(self, threadInfo: tuple):
try:
thread, stop_event = threadInfo
stop_event.set()
thread.join()
print ("Terminating Analytics backend RequestListener")
LOGGER.info("Terminating Analytics backend RequestListener")
return True
except Exception as e:
print ("Failed to terminate analytics backend {:}".format(e))
LOGGER.error("Failed to terminate analytics backend {:}".format(e))
return False
def install_services(self):
stop_event = threading.Event()
thread = threading.Thread(target=self.RequestListener,
args=(stop_event,) )
thread.start()
return (thread, stop_event)
def RequestListener(self, stop_event):
"""
listener for requests on Kafka topic.
"""
consumer = self.kafka_consumer
consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
while not stop_event.is_set():
receive_msg = consumer.poll(2.0)
if receive_msg is None:
continue
elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(receive_msg.error()))
break
analyzer = json.loads(receive_msg.value().decode('utf-8'))
analyzer_uuid = receive_msg.key().decode('utf-8')
LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
self.TerminateAnalyzerBackend(analyzer_uuid)
else:
self.StartSparkStreamer(analyzer_uuid, analyzer)
LOGGER.debug("Stop Event activated. Terminating...")
print ("Stop Event activated. Terminating...")
def TerminateAnalyzerBackend(self, analyzer_uuid):
if analyzer_uuid in self.running_threads:
try:
thread, stop_event = self.running_threads[analyzer_uuid]
stop_event.set()
thread.join()
del self.running_threads[analyzer_uuid]
print ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
return True
except Exception as e:
LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: {:}".format(analyzer_uuid, e))
return False
else:
print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid))
LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))
# generate confirmation towards frontend
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, time
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import from_json, col, window, avg, min, max, first, last, stddev, when, round
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
LOGGER = logging.getLogger(__name__)
def DefiningSparkSession():
# Create a Spark session with specific spark verions (3.5.0)
return SparkSession.builder \
.appName("Analytics") \
.config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.getOrCreate()
def SettingKafkaConsumerParams(): # TODO: create get_kafka_consumer() in common with inputs (bootstrap server, subscribe, startingOffset and failOnDataLoss with default values)
return {
# "kafka.bootstrap.servers": '127.0.0.1:9092',
"kafka.bootstrap.servers": KafkaConfig.get_kafka_address(),
"subscribe" : KafkaTopic.VALUE.value,
"startingOffsets" : 'latest',
"failOnDataLoss" : 'false' # Optional: Set to "true" to fail the query on data loss
}
def DefiningRequestSchema():
return StructType([
StructField("time_stamp" , StringType() , True),
StructField("kpi_id" , StringType() , True),
StructField("kpi_value" , DoubleType() , True)
])
def GetAggregations(oper_list):
# Define the possible aggregation functions
agg_functions = {
'avg' : round(avg ("kpi_value"), 3) .alias("avg_value"),
'min' : round(min ("kpi_value"), 3) .alias("min_value"),
'max' : round(max ("kpi_value"), 3) .alias("max_value"),
'first': round(first ("kpi_value"), 3) .alias("first_value"),
'last' : round(last ("kpi_value"), 3) .alias("last_value"),
'stdev': round(stddev ("kpi_value"), 3) .alias("stdev_value")
}
return [agg_functions[op] for op in oper_list if op in agg_functions] # Filter and return only the selected aggregations
def ApplyThresholds(aggregated_df, thresholds):
# Apply thresholds (TH-Fail and TH-RAISE) based on the thresholds dictionary on the aggregated DataFrame.
# Loop through each column name and its associated thresholds
for col_name, (fail_th, raise_th) in thresholds.items():
# Apply TH-Fail condition (if column value is less than the fail threshold)
aggregated_df = aggregated_df.withColumn(
f"{col_name}_THRESHOLD_FAIL",
when(col(col_name) < fail_th, True).otherwise(False)
)
# Apply TH-RAISE condition (if column value is greater than the raise threshold)
aggregated_df = aggregated_df.withColumn(
f"{col_name}_THRESHOLD_RAISE",
when(col(col_name) > raise_th, True).otherwise(False)
)
return aggregated_df
def SparkStreamer(key, kpi_list, oper_list, thresholds, stop_event,
window_size=None, win_slide_duration=None, time_stamp_col=None):
"""
Method to perform Spark operation Kafka stream.
NOTE: Kafka topic to be processesd should have atleast one row before initiating the spark session.
"""
kafka_consumer_params = SettingKafkaConsumerParams() # Define the Kafka consumer parameters
schema = DefiningRequestSchema() # Define the schema for the incoming JSON data
spark = DefiningSparkSession() # Define the spark session with app name and spark version
# extra options default assignment
if window_size is None: window_size = "60 seconds" # default
if win_slide_duration is None: win_slide_duration = "30 seconds" # default
if time_stamp_col is None: time_stamp_col = "time_stamp" # default
try:
# Read data from Kafka
raw_stream_data = spark \
.readStream \
.format("kafka") \
.options(**kafka_consumer_params) \
.load()
# Convert the value column from Kafka to a string
stream_data = raw_stream_data.selectExpr("CAST(value AS STRING)")
# Parse the JSON string into a DataFrame with the defined schema
parsed_stream_data = stream_data.withColumn("parsed_value", from_json(col("value"), schema))
# Select the parsed fields
final_stream_data = parsed_stream_data.select("parsed_value.*")
# Convert the time_stamp to proper timestamp (assuming it's in ISO format)
final_stream_data = final_stream_data.withColumn(time_stamp_col, col(time_stamp_col).cast(TimestampType()))
# Filter the stream to only include rows where the kpi_id is in the kpi_list
filtered_stream_data = final_stream_data.filter(col("kpi_id").isin(kpi_list))
# Define a window for aggregation
windowed_stream_data = filtered_stream_data \
.groupBy(
window( col(time_stamp_col),
window_size, slideDuration=win_slide_duration
),
col("kpi_id")
) \
.agg(*GetAggregations(oper_list))
# Apply thresholds to the aggregated data
thresholded_stream_data = ApplyThresholds(windowed_stream_data, thresholds)
# --- This will write output on console: FOR TESTING PURPOSES
# Start the Spark streaming query
# query = thresholded_stream_data \
# .writeStream \
# .outputMode("update") \
# .format("console")
# --- This will write output to Kafka: ACTUAL IMPLEMENTATION
query = thresholded_stream_data \
.selectExpr(f"'{key}' AS key", "to_json(struct(*)) AS value") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \
.option("topic", KafkaTopic.ANALYTICS_RESPONSE.value) \
.option("checkpointLocation", "analytics/.spark/checkpoint") \
.outputMode("update")
# Start the query execution
queryHandler = query.start()
# Loop to check for stop event flag. To be set by stop collector method.
while True:
if stop_event.is_set():
LOGGER.debug("Stop Event activated. Terminating in 5 seconds...")
print ("Stop Event activated. Terminating in 5 seconds...")
time.sleep(5)
queryHandler.stop()
break
time.sleep(5)
except Exception as e:
print("Error in Spark streaming process: {:}".format(e))
LOGGER.debug("Error in Spark streaming process: {:}".format(e))
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, signal, sys, threading
from prometheus_client import start_http_server
from common.Settings import get_log_level, get_metrics_port
from .AnalyticsBackendService import AnalyticsBackendService
terminate = threading.Event()
LOGGER = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received')
terminate.set()
def main():
global LOGGER # pylint: disable=global-statement
log_level = get_log_level()
logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
LOGGER = logging.getLogger(__name__)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info('Starting...')
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
grpc_service = AnalyticsBackendService()
grpc_service.start()
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass
LOGGER.info('Terminating...')
grpc_service.stop()
LOGGER.info('Bye')
return 0
if __name__ == '__main__':
sys.exit(main())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment