Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Commits on Source (77)
Showing
with 342 additions and 266 deletions
......@@ -48,6 +48,6 @@ include:
- local: '/src/kpi_manager/.gitlab-ci.yml'
- local: '/src/kpi_value_api/.gitlab-ci.yml'
- local: '/src/kpi_value_writer/.gitlab-ci.yml'
- local: '/src/telemetry/.gitlab-ci.yml'
# This should be last one: end-to-end integration tests
- local: '/src/tests/.gitlab-ci.yml'
......@@ -215,6 +215,9 @@ export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"}
# Deploy QuestDB
./deploy/qdb.sh
# Deploy Apache Kafka
./deploy/kafka.sh
# Expose Dashboard
./deploy/expose_dashboard.sh
......
......@@ -20,50 +20,71 @@
# If not already set, set the namespace where Apache Kafka will be deployed.
export KFK_NAMESPACE=${KFK_NAMESPACE:-"kafka"}
# If not already set, set the port Apache Kafka server will be exposed to.
export KFK_SERVER_PORT=${KFK_SERVER_PORT:-"9092"}
# If not already set, if flag is YES, Apache Kafka will be redeployed and all topics will be lost.
export KFK_REDEPLOY=${KFK_REDEPLOY:-""}
########################################################################################################################
# Automated steps start here
########################################################################################################################
# Constants
TMP_FOLDER="./tmp"
KFK_MANIFESTS_PATH="manifests/kafka"
KFK_ZOOKEEPER_MANIFEST="01-zookeeper.yaml"
KFK_MANIFEST="02-kafka.yaml"
# Constants
TMP_FOLDER="./tmp"
KFK_MANIFESTS_PATH="manifests/kafka"
KFK_ZOOKEEPER_MANIFEST="01-zookeeper.yaml"
KFK_MANIFEST="02-kafka.yaml"
# Create a tmp folder for files modified during the deployment
TMP_MANIFESTS_FOLDER="${TMP_FOLDER}/${KFK_NAMESPACE}/manifests"
mkdir -p ${TMP_MANIFESTS_FOLDER}
# Create a tmp folder for files modified during the deployment
TMP_MANIFESTS_FOLDER="${TMP_FOLDER}/${KFK_NAMESPACE}/manifests"
mkdir -p ${TMP_MANIFESTS_FOLDER}
function kafka_deploy() {
# copy zookeeper and kafka manifest files to temporary manifest location
cp "${KFK_MANIFESTS_PATH}/${KFK_ZOOKEEPER_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}"
cp "${KFK_MANIFESTS_PATH}/${KFK_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_MANIFEST}"
# copy zookeeper and kafka manifest files to temporary manifest location
cp "${KFK_MANIFESTS_PATH}/${KFK_ZOOKEEPER_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}"
cp "${KFK_MANIFESTS_PATH}/${KFK_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_MANIFEST}"
# echo "Apache Kafka Namespace"
echo ">>> Delete Apache Kafka Namespace"
kubectl delete namespace ${KFK_NAMESPACE} --ignore-not-found
echo "Apache Kafka Namespace"
echo ">>> Delete Apache Kafka Namespace"
kubectl delete namespace ${KFK_NAMESPACE} --ignore-not-found
echo ">>> Create Apache Kafka Namespace"
kubectl create namespace ${KFK_NAMESPACE}
echo ">>> Create Apache Kafka Namespace"
kubectl create namespace ${KFK_NAMESPACE}
# echo ">>> Deplying Apache Kafka Zookeeper"
# Kafka zookeeper service should be deployed before the kafka service
kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}"
echo ">>> Deplying Apache Kafka Zookeeper"
# Kafka zookeeper service should be deployed before the kafka service
kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}"
KFK_ZOOKEEPER_SERVICE="zookeeper-service" # this command may be replaced with command to extract service name automatically
KFK_ZOOKEEPER_IP=$(kubectl --namespace ${KFK_NAMESPACE} get service ${KFK_ZOOKEEPER_SERVICE} -o 'jsonpath={.spec.clusterIP}')
KFK_ZOOKEEPER_SERVICE="zookeeper-service" # this command may be replaced with command to extract service name automatically
KFK_ZOOKEEPER_IP=$(kubectl --namespace ${KFK_NAMESPACE} get service ${KFK_ZOOKEEPER_SERVICE} -o 'jsonpath={.spec.clusterIP}')
# Kafka service should be deployed after the zookeeper service
sed -i "s/<ZOOKEEPER_INTERNAL_IP>/${KFK_ZOOKEEPER_IP}/" "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
# Kafka service should be deployed after the zookeeper service
sed -i "s/<ZOOKEEPER_INTERNAL_IP>/${KFK_ZOOKEEPER_IP}/" "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
# echo ">>> Deploying Apache Kafka Broker"
kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
echo ">>> Deploying Apache Kafka Broker"
kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
# echo ">>> Verifing Apache Kafka deployment"
sleep 5
# KFK_PODS_STATUS=$(kubectl --namespace ${KFK_NAMESPACE} get pods)
# if echo "$KFK_PODS_STATUS" | grep -qEv 'STATUS|Running'; then
# echo "Deployment Error: \n $KFK_PODS_STATUS"
# else
# echo "$KFK_PODS_STATUS"
# fi
}
echo ">>> Verifing Apache Kafka deployment"
sleep 10
KFK_PODS_STATUS=$(kubectl --namespace ${KFK_NAMESPACE} get pods)
if echo "$KFK_PODS_STATUS" | grep -qEv 'STATUS|Running'; then
echo "Deployment Error: \n $KFK_PODS_STATUS"
echo "Apache Kafka"
echo ">>> Checking if Apache Kafka is deployed ... "
if [ "$KFK_REDEPLOY" == "YES" ]; then
echo ">>> Redeploying kafka namespace"
kafka_deploy
elif kubectl get namespace "${KFK_NAMESPACE}" &> /dev/null; then
echo ">>> Apache Kafka already present; skipping step."
else
echo "$KFK_PODS_STATUS"
fi
\ No newline at end of file
echo ">>> Kafka namespace doesn't exists. Deploying kafka namespace"
kafka_deploy
fi
echo
......@@ -115,6 +115,17 @@ export PROM_EXT_PORT_HTTP=${PROM_EXT_PORT_HTTP:-"9090"}
export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"}
# ----- Apache Kafka ------------------------------------------------------
# If not already set, set the namespace where Apache Kafka will be deployed.
export KFK_NAMESPACE=${KFK_NAMESPACE:-"kafka"}
# If not already set, set the port Apache Kafka server will be exposed to.
export KFK_SERVER_PORT=${KFK_SERVER_PORT:-"9092"}
# If not already set, if flag is YES, Apache Kafka will be redeployed and topic will be lost.
export KFK_REDEPLOY=${KFK_REDEPLOY:-""}
########################################################################################################################
# Automated steps start here
########################################################################################################################
......@@ -147,7 +158,7 @@ kubectl create secret generic crdb-data --namespace ${TFS_K8S_NAMESPACE} --type=
--from-literal=CRDB_SSLMODE=require
printf "\n"
echo "Create secret with CockroachDB data for KPI Management"
echo "Create secret with CockroachDB data for KPI Management microservices"
CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}')
CRDB_DATABASE_KPI_MGMT="tfs_kpi_mgmt" # TODO: change by specific configurable environment variable
kubectl create secret generic crdb-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \
......@@ -159,6 +170,25 @@ kubectl create secret generic crdb-kpi-data --namespace ${TFS_K8S_NAMESPACE} --t
--from-literal=CRDB_SSLMODE=require
printf "\n"
echo "Create secret with CockroachDB data for Telemetry microservices"
CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}')
CRDB_DATABASE_TELEMETRY="tfs_telemetry" # TODO: change by specific configurable environment variable
kubectl create secret generic crdb-telemetry --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \
--from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \
--from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \
--from-literal=CRDB_DATABASE=${CRDB_DATABASE_TELEMETRY} \
--from-literal=CRDB_USERNAME=${CRDB_USERNAME} \
--from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \
--from-literal=CRDB_SSLMODE=require
printf "\n"
echo "Create secret with Apache Kafka data for KPI and Telemetry microservices"
KFK_SERVER_PORT=$(kubectl --namespace ${KFK_NAMESPACE} get service kafka-service -o 'jsonpath={.spec.ports[0].port}')
kubectl create secret generic kfk-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \
--from-literal=KFK_NAMESPACE=${KFK_NAMESPACE} \
--from-literal=KFK_SERVER_PORT=${KFK_SERVER_PORT}
printf "\n"
echo "Create secret with NATS data"
NATS_CLIENT_PORT=$(kubectl --namespace ${NATS_NAMESPACE} get service ${NATS_NAMESPACE} -o 'jsonpath={.spec.ports[?(@.name=="client")].port}')
if [ -z "$NATS_CLIENT_PORT" ]; then
......@@ -234,15 +264,17 @@ for COMPONENT in $TFS_COMPONENTS; do
if [ "$COMPONENT" == "ztp" ] || [ "$COMPONENT" == "policy" ]; then
$DOCKER_BUILD -t "$COMPONENT:$TFS_IMAGE_TAG" -f ./src/"$COMPONENT"/Dockerfile ./src/"$COMPONENT"/ > "$BUILD_LOG"
elif [ "$COMPONENT" == "pathcomp" ]; then
elif [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ]; then
BUILD_LOG="$TMP_LOGS_FOLDER/build_${COMPONENT}-frontend.log"
$DOCKER_BUILD -t "$COMPONENT-frontend:$TFS_IMAGE_TAG" -f ./src/"$COMPONENT"/frontend/Dockerfile . > "$BUILD_LOG"
BUILD_LOG="$TMP_LOGS_FOLDER/build_${COMPONENT}-backend.log"
$DOCKER_BUILD -t "$COMPONENT-backend:$TFS_IMAGE_TAG" -f ./src/"$COMPONENT"/backend/Dockerfile . > "$BUILD_LOG"
# next command is redundant, but helpful to keep cache updated between rebuilds
IMAGE_NAME="$COMPONENT-backend:$TFS_IMAGE_TAG-builder"
$DOCKER_BUILD -t "$IMAGE_NAME" --target builder -f ./src/"$COMPONENT"/backend/Dockerfile . >> "$BUILD_LOG"
if [ "$COMPONENT" == "pathcomp" ]; then
# next command is redundant, but helpful to keep cache updated between rebuilds
IMAGE_NAME="$COMPONENT-backend:$TFS_IMAGE_TAG-builder"
$DOCKER_BUILD -t "$IMAGE_NAME" --target builder -f ./src/"$COMPONENT"/backend/Dockerfile . >> "$BUILD_LOG"
fi
elif [ "$COMPONENT" == "dlt" ]; then
BUILD_LOG="$TMP_LOGS_FOLDER/build_${COMPONENT}-connector.log"
$DOCKER_BUILD -t "$COMPONENT-connector:$TFS_IMAGE_TAG" -f ./src/"$COMPONENT"/connector/Dockerfile . > "$BUILD_LOG"
......@@ -255,7 +287,7 @@ for COMPONENT in $TFS_COMPONENTS; do
echo " Pushing Docker image to '$TFS_REGISTRY_IMAGES'..."
if [ "$COMPONENT" == "pathcomp" ]; then
if [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ]; then
IMAGE_URL=$(echo "$TFS_REGISTRY_IMAGES/$COMPONENT-frontend:$TFS_IMAGE_TAG" | sed 's,//,/,g' | sed 's,http:/,,g')
TAG_LOG="$TMP_LOGS_FOLDER/tag_${COMPONENT}-frontend.log"
......@@ -306,7 +338,7 @@ for COMPONENT in $TFS_COMPONENTS; do
cp ./manifests/"${COMPONENT}"service.yaml "$MANIFEST"
fi
if [ "$COMPONENT" == "pathcomp" ]; then
if [ "$COMPONENT" == "pathcomp" ] || [ "$COMPONENT" == "telemetry" ]; then
IMAGE_URL=$(echo "$TFS_REGISTRY_IMAGES/$COMPONENT-frontend:$TFS_IMAGE_TAG" | sed 's,//,/,g' | sed 's,http:/,,g')
VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}-frontend:" "$MANIFEST" | cut -d ":" -f4)
sed -E -i "s#image: $GITLAB_REPO_URL/$COMPONENT-frontend:${VERSION}#image: $IMAGE_URL#g" "$MANIFEST"
......
......@@ -39,6 +39,9 @@ spec:
env:
- name: LOG_LEVEL
value: "INFO"
envFrom:
- secretRef:
name: kfk-kpi-data
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:30020"]
......
......@@ -39,6 +39,9 @@ spec:
env:
- name: LOG_LEVEL
value: "INFO"
envFrom:
- secretRef:
name: kfk-kpi-data
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:30030"]
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: apps/v1
kind: Deployment
metadata:
name: telemetryservice
spec:
selector:
matchLabels:
app: telemetryservice
#replicas: 1
template:
metadata:
labels:
app: telemetryservice
spec:
terminationGracePeriodSeconds: 5
containers:
- name: frontend
image: labs.etsi.org:5050/tfs/controller/telemetry-frontend:latest
imagePullPolicy: Always
ports:
- containerPort: 30050
- containerPort: 9192
env:
- name: LOG_LEVEL
value: "INFO"
envFrom:
- secretRef:
name: crdb-telemetry
- secretRef:
name: kfk-kpi-data
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:30050"]
livenessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:30050"]
resources:
requests:
cpu: 250m
memory: 128Mi
limits:
cpu: 1000m
memory: 1024Mi
- name: backend
image: labs.etsi.org:5050/tfs/controller/telemetry-backend:latest
imagePullPolicy: Always
ports:
- containerPort: 30060
- containerPort: 9192
env:
- name: LOG_LEVEL
value: "INFO"
envFrom:
- secretRef:
name: kfk-kpi-data
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:30060"]
livenessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:30060"]
resources:
requests:
cpu: 250m
memory: 128Mi
limits:
cpu: 1000m
memory: 1024Mi
---
apiVersion: v1
kind: Service
metadata:
name: telemetryservice
labels:
app: telemetryservice
spec:
type: ClusterIP
selector:
app: telemetryservice
ports:
- name: frontend-grpc
protocol: TCP
port: 30050
targetPort: 30050
- name: backend-grpc
protocol: TCP
port: 30060
targetPort: 30060
- name: metrics
protocol: TCP
port: 9192
targetPort: 9192
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: telemetryservice-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: telemetryservice
minReplicas: 1
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 80
#behavior:
# scaleDown:
# stabilizationWindowSeconds: 30
......@@ -181,3 +181,8 @@ export GRAF_EXT_PORT_HTTP="3000"
# Set the namespace where Apache Kafka will be deployed.
export KFK_NAMESPACE="kafka"
# Set the port Apache Kafka server will be exposed to.
export KFK_SERVER_PORT="9092"
# Set the flag to YES for redeploying of Apache Kafka
export KFK_REDEPLOY=""
......@@ -19,9 +19,9 @@ import "context.proto";
import "kpi_manager.proto";
service TelemetryFrontendService {
rpc StartCollector (Collector ) returns (CollectorId ) {}
rpc StopCollector (CollectorId ) returns (context.Empty) {}
rpc SelectCollectors(CollectorFilter) returns (CollectorList) {}
rpc StartCollector (Collector ) returns (CollectorId ) {}
rpc StopCollector (CollectorId ) returns (context.Empty) {}
rpc SelectCollectors (CollectorFilter) returns (CollectorList) {}
}
message CollectorId {
......@@ -29,10 +29,12 @@ message CollectorId {
}
message Collector {
CollectorId collector_id = 1; // The Collector ID
kpi_manager.KpiId kpi_id = 2; // The KPI Id to be associated to the collected samples
float duration_s = 3; // Terminate data collection after duration[seconds]; duration==0 means indefinitely
float interval_s = 4; // Interval between collected samples
CollectorId collector_id = 1; // The Collector ID
kpi_manager.KpiId kpi_id = 2; // The KPI Id to be associated to the collected samples
float duration_s = 3; // Terminate data collection after duration[seconds]; duration==0 means indefinitely
float interval_s = 4; // Interval between collected samples
context.Timestamp start_time = 5; // Timestamp when Collector start execution
context.Timestamp end_time = 6; // Timestamp when Collector stop execution
}
message CollectorFilter {
......
......@@ -24,7 +24,7 @@ cd $PROJECTDIR/src
# python3 kpi_manager/tests/test_unitary.py
RCFILE=$PROJECTDIR/coverage/.coveragerc
CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}')
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace ${CRDB_NAMESPACE} -o 'jsonpath={.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
kpi_manager/tests/test_kpi_manager.py
......@@ -19,7 +19,8 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
KFK_SERVER_ADDRESS=${KAFKA_IP}:9092
# helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG -o log_cli=true --verbose \
kpi_value_api/tests/test_kpi_value_api.py
......@@ -22,5 +22,5 @@ cd $PROJECTDIR/src
# kpi_manager/tests/test_unitary.py
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-cli-level=INFO --verbose \
telemetry/database/tests/telemetryDBtests.py
python3 -m pytest --log-level=DEBUG --log-cli-level=debug --verbose \
telemetry/tests/test_telemetryDB.py
......@@ -24,5 +24,5 @@ cd $PROJECTDIR/src
# python3 kpi_manager/tests/test_unitary.py
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
telemetry/frontend/tests/test_frontend.py
......@@ -65,6 +65,7 @@ class ServiceNameEnum(Enum):
KPIVALUEAPI = 'kpi-value-api'
KPIVALUEWRITER = 'kpi-value-writer'
TELEMETRYFRONTEND = 'telemetry-frontend'
TELEMETRYBACKEND = 'telemetry-backend'
# Used for test and debugging only
DLT_GATEWAY = 'dltgateway'
......@@ -98,6 +99,7 @@ DEFAULT_SERVICE_GRPC_PORTS = {
ServiceNameEnum.KPIVALUEAPI .value : 30020,
ServiceNameEnum.KPIVALUEWRITER .value : 30030,
ServiceNameEnum.TELEMETRYFRONTEND .value : 30050,
ServiceNameEnum.TELEMETRYBACKEND .value : 30060,
# Used for test and debugging only
ServiceNameEnum.DLT_GATEWAY .value : 50051,
......
......@@ -16,14 +16,29 @@ import logging
from enum import Enum
from confluent_kafka import KafkaException
from confluent_kafka.admin import AdminClient, NewTopic
from common.Settings import get_setting
LOGGER = logging.getLogger(__name__)
KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}'
class KafkaConfig(Enum):
# SERVER_IP = "127.0.0.1:9092"
SERVER_IP = "kafka-service.kafka.svc.cluster.local:9092"
ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_IP})
@staticmethod
def get_kafka_address() -> str:
kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None)
if kafka_server_address is None:
KFK_NAMESPACE = get_setting('KFK_NAMESPACE')
KFK_PORT = get_setting('KFK_SERVER_PORT')
kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT)
return kafka_server_address
@staticmethod
def get_admin_client():
SERVER_ADDRESS = KafkaConfig.get_kafka_address()
ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS })
return ADMIN_CLIENT
class KafkaTopic(Enum):
REQUEST = 'topic_request'
......@@ -38,8 +53,9 @@ class KafkaTopic(Enum):
Method to create Kafka topics defined as class members
"""
all_topics = [member.value for member in KafkaTopic]
LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.get_kafka_address()))
if( KafkaTopic.create_new_topic_if_not_exists( all_topics )):
LOGGER.debug("All topics are created sucsessfully")
LOGGER.debug("All topics are created sucsessfully or Already Exists")
return True
else:
LOGGER.debug("Error creating all topics")
......@@ -55,14 +71,14 @@ class KafkaTopic(Enum):
LOGGER.debug("Topics names to be verified and created: {:}".format(new_topics))
for topic in new_topics:
try:
topic_metadata = KafkaConfig.ADMIN_CLIENT.value.list_topics(timeout=5)
topic_metadata = KafkaConfig.get_admin_client().list_topics(timeout=5)
# LOGGER.debug("Existing topic list: {:}".format(topic_metadata.topics))
if topic not in topic_metadata.topics:
# If the topic does not exist, create a new topic
print("Topic {:} does not exist. Creating...".format(topic))
LOGGER.debug("Topic {:} does not exist. Creating...".format(topic))
new_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
KafkaConfig.ADMIN_CLIENT.value.create_topics([new_topic])
KafkaConfig.get_admin_client().create_topics([new_topic])
else:
print("Topic name already exists: {:}".format(topic))
LOGGER.debug("Topic name already exists: {:}".format(topic))
......
# How to locally run and test KPI manager micro-service
## --- File links need to be updated. ---
### Pre-requisets
The following requirements should be fulfilled before the execuation of KPI management service.
Ensure the following requirements are met before executing the KPI management service:
1. Verify that [kpi_management.proto](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/proto/kpi_management.proto) file exists and grpcs file are generated sucessfully.
2. Virtual enviornment exist with all the required packages listed in ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/requirements.in) are installed sucessfully.
3. Verify the creation of required database and table.
[KPI DB test](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/database/tests/KpiDBtests.py) python file enlist the functions to create tables and database and
[KPI Engine](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/service/database/KpiEngine.py) contains the DB string, update the string as per your deployment.
1. A virtual enviornment exist with all the required packages listed in ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_manager/requirements.in) sucessfully installed.
2. Verify the creation of required database and table. The
[KPI DB test](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_manager/tests/test_kpi_db.py) python file lists the functions to create tables and the database. The
[KPI Engine](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_manager/database/KpiEngine.py) file contains the DB string.
### Messages format templates
["Messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/tests/test_messages.py) python file enlist the basic gRPC messages format used during the testing.
The ["messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_manager/tests/test_messages.py) python file contains templates for creating gRPC messages.
### Test file
["KPI management test"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/tests/test_kpi_manager.py) python file enlist different tests conducted during the experiment.
### Unit test file
The ["KPI manager test"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_manager/tests/test_kpi_manager.py) python file lists various tests conducted to validate functionality.
### Flow of execution (Kpi Maanager Service functions)
1. Call the `create_database()` and `create_tables()` functions from `Kpi_DB` class to create the required database and table if they don't exist. Call `verify_tables` to verify the existence of KPI table.
2. Call the gRPC method `SetKpiDescriptor(KpiDescriptor)->KpiId` to add the KpiDescriptor in `Kpi` DB. `KpiDescriptor` and `KpiId` are both pre-defined gRPC message types.
2. Call the gRPC method `SetKpiDescriptor(KpiDescriptor)->KpiId` to add the KpiDescriptor to the `Kpi` DB. `KpiDescriptor` and `KpiId` are both pre-defined gRPC message types.
3. Call `GetKpiDescriptor(KpiId)->KpiDescriptor` to read the `KpiDescriptor` from DB and `DeleteKpiDescriptor(KpiId)` to delete the `KpiDescriptor` from DB.
3. Call `GetKpiDescriptor(KpiId)->KpiDescriptor` to read the `KpiDescriptor` from the DB and `DeleteKpiDescriptor(KpiId)` to delete the `KpiDescriptor` from the DB.
4. Call `SelectKpiDescriptor(KpiDescriptorFilter)->KpiDescriptorList` to get all `KpiDescriptor` objects that matches the filter criteria. `KpiDescriptorFilter` and `KpiDescriptorList` are pre-defined gRPC message types.
## For KPI composer and KPI writer
The functionalities of KPI composer and writer is heavily dependent upon Telemetery service. Therfore, these services has other pre-requsites that are mention [here](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/telemetry/requirements.in).
4. Call `SelectKpiDescriptor(KpiDescriptorFilter)->KpiDescriptorList` to get all `KpiDescriptor` objects that matches filter criteria. `KpiDescriptorFilter` and `KpiDescriptorList` are pre-defined gRPC message types.
......@@ -34,14 +34,15 @@ class KpiDB:
def create_database(self) -> None:
if not sqlalchemy_utils.database_exists(self.db_engine.url):
LOGGER.debug("Database created. {:}".format(self.db_engine.url))
sqlalchemy_utils.create_database(self.db_engine.url)
LOGGER.debug("Database created. {:}".format(self.db_engine.url))
def drop_database(self) -> None:
if sqlalchemy_utils.database_exists(self.db_engine.url):
sqlalchemy_utils.drop_database(self.db_engine.url)
def create_tables(self):
# TODO: use "get_tables(declatrative class obj)" method of "sqlalchemy_utils" to verify tables.
try:
KpiModel.metadata.create_all(self.db_engine) # type: ignore
LOGGER.debug("Tables created in the DB Name: {:}".format(self.db_name))
......@@ -69,8 +70,7 @@ class KpiDB:
session.rollback()
if "psycopg2.errors.UniqueViolation" in str(e):
LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}")
raise AlreadyExistsException(row.__class__.__name__, row,
extra_details=["Unique key voilation: {:}".format(e)] )
raise AlreadyExistsException(row.__class__.__name__, row, extra_details=["Unique key voilation: {:}".format(e)] )
else:
LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}")
raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)])
......@@ -89,7 +89,6 @@ class KpiDB:
print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search))
return None
except Exception as e:
session.rollback()
LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}")
raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)])
finally:
......
......@@ -21,8 +21,8 @@ LOGGER = logging.getLogger(__name__)
def test_verify_databases_and_Tables():
LOGGER.info('>>> test_verify_Tables : START <<< ')
kpiDBobj = KpiDB()
kpiDBobj.drop_database()
kpiDBobj.verify_tables()
# kpiDBobj.drop_database()
# kpiDBobj.verify_tables()
kpiDBobj.create_database()
kpiDBobj.create_tables()
kpiDBobj.verify_tables()
......@@ -17,7 +17,7 @@ import os, pytest
import logging
from typing import Union
#from common.proto.context_pb2 import Empty
from common.proto.context_pb2 import Empty
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
......@@ -26,12 +26,6 @@ from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server
from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList
from common.tools.service.GenericGrpcService import GenericGrpcService
#from context.client.ContextClient import ContextClient
# from device.service.driver_api.DriverFactory import DriverFactory
# from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
# from device.service.DeviceService import DeviceService
# from device.client.DeviceClient import DeviceClient
from kpi_manager.tests.test_messages import create_kpi_descriptor_request, create_kpi_filter_request, create_kpi_descriptor_request_a
from kpi_manager.service.KpiManagerService import KpiManagerService
......@@ -39,12 +33,6 @@ from kpi_manager.client.KpiManagerClient import KpiManagerClient
from kpi_manager.tests.test_messages import create_kpi_descriptor_request
from kpi_manager.tests.test_messages import create_kpi_id_request
#from monitoring.service.NameMapping import NameMapping
#os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE'
#from device.service.drivers import DRIVERS
###########################
# Tests Setup
###########################
......@@ -55,8 +43,6 @@ KPIMANAGER_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIMANAGER) # t
os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(KPIMANAGER_SERVICE_PORT)
# METRICSDB_HOSTNAME = os.environ.get('METRICSDB_HOSTNAME'){}
LOGGER = logging.getLogger(__name__)
class MockContextService(GenericGrpcService):
......@@ -70,84 +56,10 @@ class MockContextService(GenericGrpcService):
self.context_servicer = MockServicerImpl_Context()
add_ContextServiceServicer_to_server(self.context_servicer, self.server)
# @pytest.fixture(scope='session')
# def context_service():
# LOGGER.info('Initializing MockContextService...')
# _service = MockContextService(MOCKSERVICE_PORT)
# _service.start()
# LOGGER.info('Yielding MockContextService...')
# yield _service
# LOGGER.info('Terminating MockContextService...')
# _service.context_servicer.msg_broker.terminate()
# _service.stop()
# LOGGER.info('Terminated MockContextService...')
# @pytest.fixture(scope='session')
# def context_client(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
# LOGGER.info('Initializing ContextClient...')
# _client = ContextClient()
# LOGGER.info('Yielding ContextClient...')
# yield _client
# LOGGER.info('Closing ContextClient...')
# _client.close()
# LOGGER.info('Closed ContextClient...')
# @pytest.fixture(scope='session')
# def device_service(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
# LOGGER.info('Initializing DeviceService...')
# driver_factory = DriverFactory(DRIVERS)
# driver_instance_cache = DriverInstanceCache(driver_factory)
# _service = DeviceService(driver_instance_cache)
# _service.start()
# # yield the server, when test finishes, execution will resume to stop it
# LOGGER.info('Yielding DeviceService...')
# yield _service
# LOGGER.info('Terminating DeviceService...')
# _service.stop()
# LOGGER.info('Terminated DeviceService...')
# @pytest.fixture(scope='session')
# def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument
# LOGGER.info('Initializing DeviceClient...')
# _client = DeviceClient()
# LOGGER.info('Yielding DeviceClient...')
# yield _client
# LOGGER.info('Closing DeviceClient...')
# _client.close()
# LOGGER.info('Closed DeviceClient...')
# @pytest.fixture(scope='session')
# def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument
# LOGGER.info('Initializing DeviceClient...')
# _client = DeviceClient()
# LOGGER.info('Yielding DeviceClient...')
# yield _client
# LOGGER.info('Closing DeviceClient...')
# _client.close()
# LOGGER.info('Closed DeviceClient...')
# This fixture will be requested by test cases and last during testing session
@pytest.fixture(scope='session')
def kpi_manager_service():
LOGGER.info('Initializing KpiManagerService...')
#name_mapping = NameMapping()
# _service = MonitoringService(name_mapping)
# _service = KpiManagerService(name_mapping)
_service = KpiManagerService()
_service.start()
......@@ -181,35 +93,28 @@ def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disab
# Prepare Environment, should be the first test
##################################################
# # ERROR on this test ---
# def test_prepare_environment(
# context_client : ContextClient, # pylint: disable=redefined-outer-name,unused-argument
# ):
# context_id = json_context_id(DEFAULT_CONTEXT_NAME)
# context_client.SetContext(Context(**json_context(DEFAULT_CONTEXT_NAME)))
# context_client.SetTopology(Topology(**json_topology(DEFAULT_TOPOLOGY_NAME, context_id=context_id)))
###########################
# Tests Implementation of Kpi Manager
###########################
# ---------- 3rd Iteration Tests ----------------
# def test_SetKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ")
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiId)
def test_SetKpiDescriptor(kpi_manager_client):
LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ")
response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
LOGGER.info("Response gRPC message object: {:}".format(response))
assert isinstance(response, KpiId)
# def test_DeleteKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ")
# # adding KPI
# response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# # deleting KPI
# del_response = kpi_manager_client.DeleteKpiDescriptor(response_id)
# # select KPI
# kpi_manager_client.GetKpiDescriptor(response_id)
# LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response))
# assert isinstance(del_response, Empty)
def test_DeleteKpiDescriptor(kpi_manager_client):
LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ")
# adding KPI
response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# deleting KPI
del_response = kpi_manager_client.DeleteKpiDescriptor(response_id)
# select KPI
kpi_manager_client.GetKpiDescriptor(response_id)
LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response))
assert isinstance(del_response, Empty)
def test_GetKpiDescriptor(kpi_manager_client):
LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ")
......@@ -225,77 +130,18 @@ def test_GetKpiDescriptor(kpi_manager_client):
assert isinstance(response, KpiDescriptor)
# def test_SelectKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ")
# # adding KPI
# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# # select KPI(s)
# response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request())
# LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiDescriptorList)
# def test_set_list_of_KPIs(kpi_manager_client):
# LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
# KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"]
# # adding KPI
# for kpi in KPIs_TO_SEARCH:
# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi))
# ---------- 2nd Iteration Tests -----------------
# def test_SetKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ")
# with open("kpi_manager/tests/KPI_configs.json", 'r') as file:
# data = json.load(file)
# _descriptors = data.get('KPIs', [])
# for _descritor_name in _descriptors:
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(_descritor_name))
# LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiId)
# def test_GetKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ")
# response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request())
# LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiDescriptor)
# def test_DeleteKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ")
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# del_response = kpi_manager_client.DeleteKpiDescriptor(response)
# kpi_manager_client.GetKpiDescriptor(response)
# LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response))
# assert isinstance(del_response, Empty)
# def test_SelectKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ")
# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a())
# response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request_a())
# LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiDescriptorList)
# ------------- INITIAL TESTs ----------------
# Test case that makes use of client fixture to test server's CreateKpi method
# def test_set_kpi(kpi_manager_client): # pylint: disable=redefined-outer-name
# # make call to server
# LOGGER.warning('test_create_kpi requesting')
# for i in range(3):
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_request(str(i+1)))
# LOGGER.debug(str(response))
# assert isinstance(response, KpiId)
# # Test case that makes use of client fixture to test server's DeleteKpi method
# def test_delete_kpi(kpi_manager_client): # pylint: disable=redefined-outer-name
# # make call to server
# LOGGER.warning('delete_kpi requesting')
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_request('4'))
# response = kpi_manager_client.DeleteKpiDescriptor(response)
# LOGGER.debug(str(response))
# assert isinstance(response, Empty)
def test_SelectKpiDescriptor(kpi_manager_client):
LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ")
# adding KPI
kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# select KPI(s)
response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request())
LOGGER.info("Response gRPC message object: {:}".format(response))
assert isinstance(response, KpiDescriptorList)
# # Test case that makes use of client fixture to test server's GetKpiDescriptor method
# def test_select_kpi_descriptor(kpi_manager_client): # pylint: disable=redefined-outer-name
# LOGGER.warning('test_selectkpidescritor begin')
# response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request())
# LOGGER.debug(str(response))
# assert isinstance(response, KpiDescriptorList)
def test_set_list_of_KPIs(kpi_manager_client):
LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"]
# adding KPI
for kpi in KPIs_TO_SEARCH:
kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi))
......@@ -50,10 +50,30 @@ unit_test kpi-value-api:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
- if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME container is not in the system"; fi
- if docker container ls | grep kafka; then docker rm -f kafka; else echo "Kafka container is not in the system"; fi
- if docker container ls | grep zookeeper; then docker rm -f zookeeper; else echo "Zookeeper container is not in the system"; fi
- docker container prune -f
script:
- docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker run --name $IMAGE_NAME -d -p 30020:30020 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
- docker pull "bitnami/zookeeper:latest"
- docker pull "bitnami/kafka:latest"
- >
docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181
bitnami/zookeeper:latest
- sleep 10 # Wait for Zookeeper to start
- docker run --name kafka -d --network=teraflowbridge -p 9092:9092
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
--env ALLOW_PLAINTEXT_LISTENER=yes
bitnami/kafka:latest
- sleep 20 # Wait for Kafka to start
- KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
- echo $KAFKA_IP
- >
docker run --name $IMAGE_NAME -d -p 30020:30020
--env "KFK_SERVER_ADDRESS=${KAFKA_IP}:9092"
--volume "$PWD/src/$IMAGE_NAME/tests:/opt/results"
--network=teraflowbridge
$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
- sleep 5
- docker ps -a
- docker logs $IMAGE_NAME
......@@ -74,7 +94,7 @@ unit_test kpi-value-api:
- src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py
- src/$IMAGE_NAME/tests/Dockerfile
# - src/$IMAGE_NAME/tests/Dockerfile # mayne not needed
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
artifacts:
......