Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Commits on Source (4)
......@@ -215,6 +215,9 @@ export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"}
# Deploy QuestDB
./deploy/qdb.sh
# Deploy Apache Kafka
./deploy/kafka.sh
# Expose Dashboard
./deploy/expose_dashboard.sh
......
......@@ -20,50 +20,69 @@
# If not already set, set the namespace where Apache Kafka will be deployed.
export KFK_NAMESPACE=${KFK_NAMESPACE:-"kafka"}
# If not already set, set the port Apache Kafka server will be exposed to.
export KFK_SERVER_PORT=${KFK_SERVER_PORT:-"9092"}
# If not already set, if flag is YES, Apache Kafka will be redeployed and all topics will be lost.
export KFK_REDEPLOY=${KFK_REDEPLOY:-""}
########################################################################################################################
# Automated steps start here
########################################################################################################################
# Constants
TMP_FOLDER="./tmp"
KFK_MANIFESTS_PATH="manifests/kafka"
KFK_ZOOKEEPER_MANIFEST="01-zookeeper.yaml"
KFK_MANIFEST="02-kafka.yaml"
# Constants
TMP_FOLDER="./tmp"
KFK_MANIFESTS_PATH="manifests/kafka"
KFK_ZOOKEEPER_MANIFEST="01-zookeeper.yaml"
KFK_MANIFEST="02-kafka.yaml"
# Create a tmp folder for files modified during the deployment
TMP_MANIFESTS_FOLDER="${TMP_FOLDER}/${KFK_NAMESPACE}/manifests"
mkdir -p ${TMP_MANIFESTS_FOLDER}
# Create a tmp folder for files modified during the deployment
TMP_MANIFESTS_FOLDER="${TMP_FOLDER}/${KFK_NAMESPACE}/manifests"
mkdir -p ${TMP_MANIFESTS_FOLDER}
function kafka_deploy() {
# copy zookeeper and kafka manifest files to temporary manifest location
cp "${KFK_MANIFESTS_PATH}/${KFK_ZOOKEEPER_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}"
cp "${KFK_MANIFESTS_PATH}/${KFK_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_MANIFEST}"
# copy zookeeper and kafka manifest files to temporary manifest location
cp "${KFK_MANIFESTS_PATH}/${KFK_ZOOKEEPER_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}"
cp "${KFK_MANIFESTS_PATH}/${KFK_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_MANIFEST}"
# echo "Apache Kafka Namespace"
echo ">>> Delete Apache Kafka Namespace"
kubectl delete namespace ${KFK_NAMESPACE} --ignore-not-found
echo "Apache Kafka Namespace"
echo ">>> Delete Apache Kafka Namespace"
kubectl delete namespace ${KFK_NAMESPACE} --ignore-not-found
echo ">>> Create Apache Kafka Namespace"
kubectl create namespace ${KFK_NAMESPACE}
echo ">>> Create Apache Kafka Namespace"
kubectl create namespace ${KFK_NAMESPACE}
# echo ">>> Deplying Apache Kafka Zookeeper"
# Kafka zookeeper service should be deployed before the kafka service
kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}"
echo ">>> Deplying Apache Kafka Zookeeper"
# Kafka zookeeper service should be deployed before the kafka service
kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}"
KFK_ZOOKEEPER_SERVICE="zookeeper-service" # this command may be replaced with command to extract service name automatically
KFK_ZOOKEEPER_IP=$(kubectl --namespace ${KFK_NAMESPACE} get service ${KFK_ZOOKEEPER_SERVICE} -o 'jsonpath={.spec.clusterIP}')
KFK_ZOOKEEPER_SERVICE="zookeeper-service" # this command may be replaced with command to extract service name automatically
KFK_ZOOKEEPER_IP=$(kubectl --namespace ${KFK_NAMESPACE} get service ${KFK_ZOOKEEPER_SERVICE} -o 'jsonpath={.spec.clusterIP}')
# Kafka service should be deployed after the zookeeper service
sed -i "s/<ZOOKEEPER_INTERNAL_IP>/${KFK_ZOOKEEPER_IP}/" "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
# Kafka service should be deployed after the zookeeper service
sed -i "s/<ZOOKEEPER_INTERNAL_IP>/${KFK_ZOOKEEPER_IP}/" "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
# echo ">>> Deploying Apache Kafka Broker"
kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
echo ">>> Deploying Apache Kafka Broker"
kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
# echo ">>> Verifing Apache Kafka deployment"
sleep 5
# KFK_PODS_STATUS=$(kubectl --namespace ${KFK_NAMESPACE} get pods)
# if echo "$KFK_PODS_STATUS" | grep -qEv 'STATUS|Running'; then
# echo "Deployment Error: \n $KFK_PODS_STATUS"
# else
# echo "$KFK_PODS_STATUS"
# fi
}
echo ">>> Verifing Apache Kafka deployment"
sleep 10
KFK_PODS_STATUS=$(kubectl --namespace ${KFK_NAMESPACE} get pods)
if echo "$KFK_PODS_STATUS" | grep -qEv 'STATUS|Running'; then
echo "Deployment Error: \n $KFK_PODS_STATUS"
echo "Apache Kafka"
echo ">>> Checking if Apache Kafka is deployed ... "
if [ "$KFK_REDEPLOY" == "YES" ]; then
kafka_deploy
elif kubectl get --namespace ${KFK_NAMESPACE} deployments.apps &> /dev/null; then
echo ">>> Apache Kafka already present; skipping step."
else
echo "$KFK_PODS_STATUS"
fi
\ No newline at end of file
kafka_deploy
fi
echo
......@@ -115,6 +115,17 @@ export PROM_EXT_PORT_HTTP=${PROM_EXT_PORT_HTTP:-"9090"}
export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"}
# ----- Apache Kafka ------------------------------------------------------
# If not already set, set the namespace where Apache Kafka will be deployed.
export KFK_NAMESPACE=${KFK_NAMESPACE:-"kafka"}
# If not already set, set the port Apache Kafka server will be exposed to.
export KFK_SERVER_PORT=${KFK_SERVER_PORT:-"9092"}
# If not already set, if flag is YES, Apache Kafka will be redeployed and topic will be lost.
export KFK_REDEPLOY=${KFK_REDEPLOY:-""}
########################################################################################################################
# Automated steps start here
########################################################################################################################
......@@ -147,7 +158,7 @@ kubectl create secret generic crdb-data --namespace ${TFS_K8S_NAMESPACE} --type=
--from-literal=CRDB_SSLMODE=require
printf "\n"
echo "Create secret with CockroachDB data for KPI Management"
echo "Create secret with CockroachDB data for KPI Management microservices"
CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}')
CRDB_DATABASE_KPI_MGMT="tfs_kpi_mgmt" # TODO: change by specific configurable environment variable
kubectl create secret generic crdb-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \
......@@ -159,6 +170,13 @@ kubectl create secret generic crdb-kpi-data --namespace ${TFS_K8S_NAMESPACE} --t
--from-literal=CRDB_SSLMODE=require
printf "\n"
echo "Create secret with Apache Kafka kfk-kpi-data for KPI and Telemetry microservices"
KFK_SERVER_PORT=$(kubectl --namespace ${KFK_NAMESPACE} get service kafka-service -o 'jsonpath={.spec.ports[0].port}')
kubectl create secret generic kfk-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \
--from-literal=KFK_NAMESPACE=${KFK_NAMESPACE} \
--from-literal=KFK_SERVER_PORT=${KFK_SERVER_PORT}
printf "\n"
echo "Create secret with NATS data"
NATS_CLIENT_PORT=$(kubectl --namespace ${NATS_NAMESPACE} get service ${NATS_NAMESPACE} -o 'jsonpath={.spec.ports[?(@.name=="client")].port}')
if [ -z "$NATS_CLIENT_PORT" ]; then
......
......@@ -39,6 +39,9 @@ spec:
env:
- name: LOG_LEVEL
value: "INFO"
envFrom:
- secretRef:
name: kfk-kpi-data
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:30020"]
......
......@@ -39,6 +39,9 @@ spec:
env:
- name: LOG_LEVEL
value: "INFO"
envFrom:
- secretRef:
name: kfk-kpi-data
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:30030"]
......
......@@ -33,7 +33,6 @@ from .MetricWriterToPrometheus import MetricWriterToPrometheus
LOGGER = logging.getLogger(__name__)
ACTIVE_CONSUMERS = []
METRIC_WRITER = MetricWriterToPrometheus()
class KpiValueWriter(GenericGrpcService):
def __init__(self, cls_name : str = __name__) -> None:
......@@ -48,12 +47,14 @@ class KpiValueWriter(GenericGrpcService):
@staticmethod
def KafkaConsumer():
kpi_manager_client = KpiManagerClient()
metric_writer = MetricWriterToPrometheus()
kafka_consumer = KafkaConsumer(
{ 'bootstrap.servers' : KafkaConfig.SERVER_IP.value,
'group.id' : __class__,
'auto.offset.reset' : 'latest'}
)
kpi_manager_client = KpiManagerClient()
)
kafka_consumer.subscribe([KafkaTopic.VALUE.value])
LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
......@@ -72,13 +73,13 @@ class KpiValueWriter(GenericGrpcService):
kpi_value.ParseFromString(raw_kpi.value())
LOGGER.info("Received KPI : {:}".format(kpi_value))
print("Received KPI : {:}".format(kpi_value))
KpiValueWriter.get_kpi_descriptor(kpi_value, kpi_manager_client)
KpiValueWriter.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer)
except Exception as e:
print("Error detail: {:}".format(e))
continue
@staticmethod
def get_kpi_descriptor(kpi_value: str, kpi_manager_client ):
def get_kpi_descriptor(kpi_value: str, kpi_manager_client, metric_writer):
print("--- START -----")
kpi_id = KpiId()
......@@ -89,12 +90,11 @@ class KpiValueWriter(GenericGrpcService):
try:
kpi_descriptor_object = KpiDescriptor()
kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id)
# TODO: why kpi_descriptor_object recevies a KpiDescriptor type object not Empty type object???
if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid:
# print("kpi descriptor received: {:}".format(kpi_descriptor_object))
# if isinstance (kpi_descriptor_object, KpiDescriptor):
LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
METRIC_WRITER.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value)
metric_writer.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value)
else:
LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id))
print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id))
......
......@@ -14,11 +14,12 @@
import logging
from kpi_value_writer.service.KpiValueWriter import KpiValueWriter
from kpi_value_writer.tests.test_messages import create_kpi_id_request, create_kpi_descriptor_request
from common.tools.kafka.Variables import KafkaTopic
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from kpi_manager.tests.test_messages import create_kpi_descriptor_request
from common.proto.kpi_manager_pb2 import KpiDescriptor
from kpi_value_writer.tests.test_messages import create_kpi_id_request
from kpi_manager.client.KpiManagerClient import KpiManagerClient
LOGGER = logging.getLogger(__name__)
......