Skip to content
Snippets Groups Projects
Commit 2c1fc9c8 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes in Analytic, manifest files and deployment script

Deployment Script
- Analytics is added in new montioring component to be deployed with TFS.
- condition is added in analytics component TFS deployment script.
- Analytics show logs files are added.
Manifest
- modifed KAFKA_ADVERTISED_LISTENERS with internal kafka service address in kafka.yml
- analyticsservice.yml added.
Analytics
- grpc service is added in backend
- Docker file for backend and frontend is added.
- main file is updaed
- small changes in requirements.in files
Kafka Variables
- get_kafka_address method logic is updated.
parent c1ee28db
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!261(CTTC) New Analytics Component
Showing
with 283 additions and 23 deletions
......@@ -33,7 +33,7 @@ export TFS_COMPONENTS=${TFS_COMPONENTS:-"context device pathcomp service slice n
#export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring"
# Uncomment to activate Monitoring Framework (new)
#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api"
#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api telemetry analytics"
# Uncomment to activate BGP-LS Speaker
#export TFS_COMPONENTS="${TFS_COMPONENTS} bgpls_speaker"
......
......@@ -78,7 +78,7 @@ function kafka_deploy() {
echo "Apache Kafka"
echo ">>> Checking if Apache Kafka is deployed ... "
if [ "$KFK_REDEPLOY" = "YES" ]; then
if [ "$KFK_REDEPLOY" == "YES" ]; then
echo ">>> Redeploying kafka namespace"
kafka_deploy
elif kubectl get namespace "${KFK_NAMESPACE}" &> /dev/null; then
......
......@@ -194,7 +194,7 @@ kubectl create secret generic crdb-analytics --namespace ${TFS_K8S_NAMESPACE} --
--from-literal=CRDB_SSLMODE=require
printf "\n"
echo "Create secret with Apache Kafka data for KPI and Telemetry microservices"
echo "Create secret with Apache Kafka data for KPI, Telemetry and Analytics microservices"
KFK_SERVER_PORT=$(kubectl --namespace ${KFK_NAMESPACE} get service kafka-service -o 'jsonpath={.spec.ports[0].port}')
kubectl create secret generic kfk-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \
--from-literal=KFK_NAMESPACE=${KFK_NAMESPACE} \
......@@ -276,7 +276,7 @@ for COMPONENT in $TFS_COMPONENTS; do
if [ "$COMPONENT" == "ztp" ] || [ "$COMPONENT" == "policy" ]; then
$DOCKER_BUILD -t "$COMPONENT:$TFS_IMAGE_TAG" -f ./src/"$COMPONENT"/Dockerfile ./src/"$COMPONENT"/ > "$BUILD_LOG"
elif [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ]; then
elif [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ] || [ "$COMPONENT" == "analytics" ]; then
BUILD_LOG="$TMP_LOGS_FOLDER/build_${COMPONENT}-frontend.log"
$DOCKER_BUILD -t "$COMPONENT-frontend:$TFS_IMAGE_TAG" -f ./src/"$COMPONENT"/frontend/Dockerfile . > "$BUILD_LOG"
......@@ -299,7 +299,7 @@ for COMPONENT in $TFS_COMPONENTS; do
echo " Pushing Docker image to '$TFS_REGISTRY_IMAGES'..."
if [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ]; then
if [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ] || [ "$COMPONENT" == "analytics" ] ; then
IMAGE_URL=$(echo "$TFS_REGISTRY_IMAGES/$COMPONENT-frontend:$TFS_IMAGE_TAG" | sed 's,//,/,g' | sed 's,http:/,,g')
TAG_LOG="$TMP_LOGS_FOLDER/tag_${COMPONENT}-frontend.log"
......@@ -350,7 +350,7 @@ for COMPONENT in $TFS_COMPONENTS; do
cp ./manifests/"${COMPONENT}"service.yaml "$MANIFEST"
fi
if [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ]; then
if [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ] || [ "$COMPONENT" == "analytics" ]; then
IMAGE_URL=$(echo "$TFS_REGISTRY_IMAGES/$COMPONENT-frontend:$TFS_IMAGE_TAG" | sed 's,//,/,g' | sed 's,http:/,,g')
VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}-frontend:" "$MANIFEST" | cut -d ":" -f4)
sed -E -i "s#image: $GITLAB_REPO_URL/$COMPONENT-frontend:${VERSION}#image: $IMAGE_URL#g" "$MANIFEST"
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: apps/v1
kind: Deployment
metadata:
name: analyticsservice
spec:
selector:
matchLabels:
app: analyticsservice
#replicas: 1
template:
metadata:
labels:
app: analyticsservice
spec:
terminationGracePeriodSeconds: 5
containers:
- name: frontend
image: labs.etsi.org:5050/tfs/controller/analytics-frontend:latest
imagePullPolicy: Always
ports:
- containerPort: 30080
- containerPort: 9192
env:
- name: LOG_LEVEL
value: "INFO"
envFrom:
- secretRef:
name: crdb-analytics
- secretRef:
name: kfk-kpi-data
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:30050"]
livenessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:30050"]
resources:
requests:
cpu: 250m
memory: 128Mi
limits:
cpu: 1000m
memory: 1024Mi
- name: backend
image: labs.etsi.org:5050/tfs/controller/analytics-backend:latest
imagePullPolicy: Always
ports:
- containerPort: 30090
- containerPort: 9192
env:
- name: LOG_LEVEL
value: "INFO"
envFrom:
- secretRef:
name: kfk-kpi-data
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:30060"]
livenessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:30060"]
resources:
requests:
cpu: 250m
memory: 128Mi
limits:
cpu: 1000m
memory: 1024Mi
---
apiVersion: v1
kind: Service
metadata:
name: analyticsservice
labels:
app: analyticsservice
spec:
type: ClusterIP
selector:
app: analyticsservice
ports:
- name: frontend-grpc
protocol: TCP
port: 30080
targetPort: 30080
- name: backend-grpc
protocol: TCP
port: 30090
targetPort: 30090
- name: metrics
protocol: TCP
port: 9192
targetPort: 9192
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: analyticsservice-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: analyticsservice
minReplicas: 1
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 80
#behavior:
# scaleDown:
# stabilizationWindowSeconds: 30
......@@ -53,9 +53,9 @@ spec:
- name: KAFKA_LISTENERS
value: PLAINTEXT://:9092
- name: KAFKA_ADVERTISED_LISTENERS
value: PLAINTEXT://localhost:9092
value: PLAINTEXT://kafka-service.kafka.svc.cluster.local:9092
image: wurstmeister/kafka
imagePullPolicy: IfNotPresent
name: kafka-broker
ports:
- containerPort: 9092
\ No newline at end of file
- containerPort: 9092
......@@ -24,5 +24,5 @@ cd $PROJECTDIR/src
# python3 kpi_manager/tests/test_unitary.py
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \
telemetry/backend/tests/testTelemetryBackend.py
python3 -m pytest --log-level=INFO --log-cli-level=debug --verbose \
telemetry/backend/tests/test_TelemetryBackend.py
#!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########################################################################################################################
# Define your deployment settings here
########################################################################################################################
# If not already set, set the name of the Kubernetes namespace to deploy to.
export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
########################################################################################################################
# Automated steps start here
########################################################################################################################
kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/analyticsservice -c backend
#!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########################################################################################################################
# Define your deployment settings here
########################################################################################################################
# If not already set, set the name of the Kubernetes namespace to deploy to.
export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
########################################################################################################################
# Automated steps start here
########################################################################################################################
kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/analyticsservice -c frontend
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM python:3.9-slim
# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ git && \
rm -rf /var/lib/apt/lists/*
# Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0
# Download the gRPC health probe
RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
# Get generic Python packages
RUN python3 -m pip install --upgrade pip
RUN python3 -m pip install --upgrade setuptools wheel
RUN python3 -m pip install --upgrade pip-tools
# Get common Python packages
# Note: this step enables sharing the previous Docker build steps among all the Python components
WORKDIR /var/teraflow
COPY common_requirements.in common_requirements.in
RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in
RUN python3 -m pip install -r common_requirements.txt
# Add common files into working directory
WORKDIR /var/teraflow/common
COPY src/common/. ./
RUN rm -rf proto
# Create proto sub-folder, copy .proto files, and generate Python code
RUN mkdir -p /var/teraflow/common/proto
WORKDIR /var/teraflow/common/proto
RUN touch __init__.py
COPY proto/*.proto ./
RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
RUN rm *.proto
RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
# Create component sub-folders, get specific Python packages
RUN mkdir -p /var/teraflow/analytics/backend
WORKDIR /var/teraflow/analytics/backend
COPY src/analytics/backend/requirements.in requirements.in
RUN pip-compile --quiet --output-file=requirements.txt requirements.in
RUN python3 -m pip install -r requirements.txt
# Add component files into working directory
WORKDIR /var/teraflow
COPY src/analytics/__init__.py analytics/__init__.py
COPY src/analytics/backend/. analytics/backend/
# Start the service
ENTRYPOINT ["python", "-m", "analytics.backend.service"]
......@@ -12,6 +12,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
java==11.0.*
pyspark==3.5.2
confluent-kafka==2.3.*
\ No newline at end of file
......@@ -21,6 +21,9 @@ from analytics.backend.service.SparkStreaming import SparkStreamer
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaError
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
LOGGER = logging.getLogger(__name__)
......@@ -29,6 +32,9 @@ class AnalyticsBackendService(GenericGrpcService):
Class listens for ...
"""
def __init__(self, cls_name : str = __name__) -> None:
LOGGER.info('Init AnalyticsBackendService')
port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND)
super().__init__(port, cls_name=cls_name)
self.running_threads = {} # To keep track of all running analyzers
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-frontend',
......@@ -72,7 +78,7 @@ class AnalyticsBackendService(GenericGrpcService):
LOGGER.error("Failed to terminate analytics backend {:}".format(e))
return False
def StartRequestListener(self)->tuple:
def install_services(self):
stop_event = threading.Event()
thread = threading.Thread(target=self.RequestListener,
args=(stop_event,) )
......
......@@ -55,7 +55,7 @@ RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
# Create component sub-folders, get specific Python packages
RUN mkdir -p /var/teraflow/analytics/frontend
WORKDIR /var/analyticstelemetry/frontend
WORKDIR /var/teraflow/analytics/frontend
COPY src/analytics/frontend/requirements.in requirements.in
RUN pip-compile --quiet --output-file=requirements.txt requirements.in
RUN python3 -m pip install -r requirements.txt
......
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
apscheduler==3.10.* # .4
apscheduler==3.10.4
confluent-kafka==2.3.*
psycopg2-binary==2.9.*
SQLAlchemy==1.4.*
......
......@@ -13,7 +13,8 @@
# limitations under the License.
import logging, signal, sys, threading
from common.Settings import get_log_level
from prometheus_client import start_http_server
from common.Settings import get_log_level, get_metrics_port
from .AnalyticsFrontendService import AnalyticsFrontendService
......@@ -28,13 +29,17 @@ def main():
global LOGGER # pylint: disable=global-statement
log_level = get_log_level()
logging.basicConfig(level=log_level)
logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
LOGGER = logging.getLogger(__name__)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
LOGGER.debug('Starting...')
LOGGER.info('Starting...')
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
grpc_service = AnalyticsFrontendService()
grpc_service.start()
......
......@@ -14,7 +14,6 @@
import logging
from enum import Enum
from confluent_kafka import KafkaException
from confluent_kafka.admin import AdminClient, NewTopic
from common.Settings import get_setting
......@@ -26,11 +25,11 @@ class KafkaConfig(Enum):
@staticmethod
def get_kafka_address() -> str:
kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None)
if kafka_server_address is None:
KFK_NAMESPACE = get_setting('KFK_NAMESPACE')
KFK_PORT = get_setting('KFK_SERVER_PORT')
kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT)
# kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None)
# if kafka_server_address is None:
KFK_NAMESPACE = get_setting('KFK_NAMESPACE')
KFK_PORT = get_setting('KFK_SERVER_PORT')
kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT)
return kafka_server_address
@staticmethod
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment