diff --git a/deploy/all.sh b/deploy/all.sh index 97f4db37d53f4a7fcca850c51a5bfe6cc7653cb4..f3075949e036c5fee17969f20199c20ed7d983d3 100755 --- a/deploy/all.sh +++ b/deploy/all.sh @@ -215,6 +215,9 @@ export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"} # Deploy Apache Kafka ./deploy/kafka.sh +#Deploy Monitoring (Prometheus, Mimir, Grafana) +./deploy/monitoring.sh + # Expose Dashboard ./deploy/expose_dashboard.sh diff --git a/deploy/monitoring.sh b/deploy/monitoring.sh new file mode 100644 index 0000000000000000000000000000000000000000..18992501a9ee852685f2a3599ec6002b515c3662 --- /dev/null +++ b/deploy/monitoring.sh @@ -0,0 +1,53 @@ +#!/bin/bash +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +RELEASE_NAME="mon-prometheus" +NAMESPACE="monitoring" +CHART_REPO_NAME="prometheus-community" +CHART_REPO_URL="https://prometheus-community.github.io/helm-charts" +CHART_NAME="prometheus" # Chart name within the repo +VALUES_FILE="manifests/prometheus/prometheus.yaml" + +echo ">>> Deploying Prometheus with the following configuration:" +echo "Adding/updating Helm repo: $CHART_REPO_NAME -> $CHART_REPO_URL" +helm repo add "$CHART_REPO_NAME" "$CHART_REPO_URL" || true +helm repo update + +echo "Creating namespace '$NAMESPACE' if it doesn't exist..." +kubectl get namespace "$NAMESPACE" >/dev/null 2>&1 || kubectl create namespace "$NAMESPACE" + +#------------------------------------------------------------------------------ +# 3. Install or upgrade the Prometheus chart +# - If 'VALUES_FILE' is set, it will use it for custom configuration. +# - Otherwise, it will deploy with the chart defaults. +#------------------------------------------------------------------------------ +if [ -n "$VALUES_FILE" ] && [ -f "$VALUES_FILE" ]; then + echo "Installing/Upgrading Prometheus with custom values from $VALUES_FILE..." + helm upgrade --install "$RELEASE_NAME" "$CHART_REPO_NAME/$CHART_NAME" \ + --namespace "$NAMESPACE" \ + --values "$VALUES_FILE" +else + echo "Installing/Upgrading Prometheus with default chart values..." + helm upgrade --install "$RELEASE_NAME" "$CHART_REPO_NAME/$CHART_NAME" \ + --namespace "$NAMESPACE" +fi + +echo "Waiting for Prometheus pods to be ready..." +kubectl rollout status deployment/"$RELEASE_NAME"-server -n "$NAMESPACE" || true + +# echo "Listing deployed resources in namespace '$NAMESPACE':" +# kubectl get all -n "$NAMESPACE" + +echo "<<< Prometheus deployment completed successfully!" diff --git a/manifests/kpi_value_writerservice.yaml b/manifests/kpi_value_writerservice.yaml index f98be462990fff4d678e41144511a284e2dd4f6c..27c61c9331606cc3e404a5b928696674a53356c0 100644 --- a/manifests/kpi_value_writerservice.yaml +++ b/manifests/kpi_value_writerservice.yaml @@ -39,6 +39,8 @@ spec: env: - name: LOG_LEVEL value: "INFO" + - name: PUSHGATEWAY_URL + value: "http://mon-prometheus-prometheus-pushgateway.monitoring.svc.cluster.local:9091" envFrom: - secretRef: name: kfk-kpi-data diff --git a/manifests/prometheus/prometheus.yaml b/manifests/prometheus/prometheus.yaml new file mode 100644 index 0000000000000000000000000000000000000000..fabc97c4a371ea47aff82a4bb310e56500aab991 --- /dev/null +++ b/manifests/prometheus/prometheus.yaml @@ -0,0 +1,52 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Configuration for Prometheus components and server settings +# Global Prometheus configuration +alertmanager: + enabled: false # Default is true +kube-state-metrics: + enabled: false # Default is true +prometheus-node-exporter: + enabled: false # Default is true +prometheus-pushgateway: + enabled: true # Default is true + +# Prometheus server-specific configuration +server: + retention: "30d" + logLevel: "debug" + resources: + requests: + cpu: "250m" + memory: "256Mi" + limits: + cpu: "1" + memory: "1Gi" + + # Expose the Prometheus server via a Kubernetes service + service: + type: NodePort + nodePort: 30090 + + extraScrapeConfigs: + - job_name: 'pushgateway' + static_configs: + - targets: + - 'prometheus-pushgateway.monitoring.svc.cluster.local:9091' # Push Gateway endpoint + + # Global Prometheus settings: + global: + scrape_interval: 10s + evaluation_interval: 10s diff --git a/scripts/run_tests_locally-kpi-value-writer.sh b/scripts/run_tests_locally-kpi-value-writer.sh index cbeed3b784a2316a3261ee7950bb5e6cffbb7fbf..e3d9c7c6a419483cf0ce9e066ba67e5d4ccefed4 100755 --- a/scripts/run_tests_locally-kpi-value-writer.sh +++ b/scripts/run_tests_locally-kpi-value-writer.sh @@ -19,6 +19,7 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src export KFK_SERVER_ADDRESS='127.0.0.1:9092' + RCFILE=$PROJECTDIR/coverage/.coveragerc python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ kpi_value_writer/tests/test_kpi_value_writer.py diff --git a/scripts/run_tests_locally-telemetry-backend.sh b/scripts/run_tests_locally-telemetry-backend.sh index f648a62520f2f7b23f30edb19bf54735f5d13e12..1b4915d7476311d1ceb1693a0934278b44516f22 100755 --- a/scripts/run_tests_locally-telemetry-backend.sh +++ b/scripts/run_tests_locally-telemetry-backend.sh @@ -25,5 +25,5 @@ export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetr RCFILE=$PROJECTDIR/coverage/.coveragerc -python3 -m pytest --log-level=debug --log-cli-level=debug --verbose \ +python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \ telemetry/backend/tests/test_backend.py diff --git a/scripts/run_tests_locally-telemetry-frontend.sh b/scripts/run_tests_locally-telemetry-frontend.sh index 38822330ec3837ac1a101e2a7d46f4928c4b31e6..e70818377ed4c7021da0222a831c6f7d319398c7 100755 --- a/scripts/run_tests_locally-telemetry-frontend.sh +++ b/scripts/run_tests_locally-telemetry-frontend.sh @@ -18,10 +18,11 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src -# python3 kpi_manager/tests/test_unitary.py export KFK_SERVER_ADDRESS='127.0.0.1:9092' CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') + export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require" RCFILE=$PROJECTDIR/coverage/.coveragerc -python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ + +python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \ telemetry/frontend/tests/test_frontend.py diff --git a/src/common/tools/client/RetryDecorator.py b/src/common/tools/client/RetryDecorator.py index 4750ff73ae4342ce2eb2a31941ff48b46e5be281..efc8b52348e6becdec13d4929e1ae9b4f3ad428f 100644 --- a/src/common/tools/client/RetryDecorator.py +++ b/src/common/tools/client/RetryDecorator.py @@ -56,7 +56,7 @@ def delay_linear(initial=0, increment=0, maximum=None): return delay return compute -def delay_exponential(initial=1, increment=1, maximum=None): +def delay_exponential(initial=1.0, increment=1.0, maximum=None): def compute(num_try): delay = initial * pow(increment, (num_try - 1)) if maximum is not None: diff --git a/src/kpi_manager/tests/test_messages.py b/src/kpi_manager/tests/test_messages.py index 5f55c2cfcfd3c5c65aa317d02376dd6971fba384..ebe13b661ffe34543808b386c2d1a3b76823e455 100644 --- a/src/kpi_manager/tests/test_messages.py +++ b/src/kpi_manager/tests/test_messages.py @@ -77,4 +77,4 @@ def create_kpi_filter_request(): _create_kpi_filter_request.connection_id.append(connection_id_obj) _create_kpi_filter_request.link_id.append(link_id_obj) - return _create_kpi_filter_request \ No newline at end of file + return _create_kpi_filter_request diff --git a/src/kpi_value_writer/service/KpiValueWriter.py b/src/kpi_value_writer/service/KpiValueWriter.py index 0bc95355e35e6deab8ba79eeeb87e278b1b2ecd2..25b8ca2e8a38091aa211e07c23062c5a666b1a94 100644 --- a/src/kpi_value_writer/service/KpiValueWriter.py +++ b/src/kpi_value_writer/service/KpiValueWriter.py @@ -15,25 +15,21 @@ import json import logging import threading + +from confluent_kafka import KafkaError +from confluent_kafka import Consumer as KafkaConsumer + from common.tools.kafka.Variables import KafkaConfig, KafkaTopic -from common.proto.kpi_value_api_pb2 import KpiValue from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId from common.Settings import get_service_port_grpc from common.Constants import ServiceNameEnum from common.tools.service.GenericGrpcService import GenericGrpcService - -from confluent_kafka import KafkaError -from confluent_kafka import Consumer as KafkaConsumer - from kpi_manager.client.KpiManagerClient import KpiManagerClient -# -- test import -- -# from kpi_value_writer.tests.test_messages import create_kpi_descriptor_request from .MetricWriterToPrometheus import MetricWriterToPrometheus -LOGGER = logging.getLogger(__name__) -ACTIVE_CONSUMERS = [] +LOGGER = logging.getLogger(__name__) class KpiValueWriter(GenericGrpcService): def __init__(self, cls_name : str = __name__) -> None: @@ -43,9 +39,8 @@ class KpiValueWriter(GenericGrpcService): 'group.id' : 'KpiValueWriter', 'auto.offset.reset' : 'latest'}) - def RunKafkaConsumer(self): + def install_servicers(self): thread = threading.Thread(target=self.KafkaKpiConsumer, args=()) - ACTIVE_CONSUMERS.append(thread) thread.start() def KafkaKpiConsumer(self): @@ -55,7 +50,6 @@ class KpiValueWriter(GenericGrpcService): consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.VALUE.value]) LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) - print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) while True: raw_kpi = consumer.poll(1.0) if raw_kpi is None: @@ -69,30 +63,21 @@ class KpiValueWriter(GenericGrpcService): try: kpi_value = json.loads(raw_kpi.value().decode('utf-8')) LOGGER.info("Received KPI : {:}".format(kpi_value)) - print("Received KPI : {:}".format(kpi_value)) self.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer) - except Exception as e: - print("Error detail: {:}".format(e)) + except: + LOGGER.exception("Error detail: ") continue def get_kpi_descriptor(self, kpi_value: str, kpi_manager_client, metric_writer): - print("--- START -----") - kpi_id = KpiId() - kpi_id.kpi_id.uuid = kpi_value['kpi_uuid'] - print("KpiId generated: {:}".format(kpi_id)) - # print("Kpi manger client created: {:}".format(kpi_manager_client)) + kpi_id.kpi_id.uuid = kpi_value['kpi_id'] # type: ignore try: kpi_descriptor_object = KpiDescriptor() kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) - # TODO: why kpi_descriptor_object recevies a KpiDescriptor type object not Empty type object??? if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid: LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) - print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) metric_writer.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value) else: - LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) - print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) + LOGGER.info("No KPI Descriptor found in Database for Kpi ID: {:}".format(kpi_id)) except Exception as e: LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e)) - print ("Unable to get KpiDescriptor. Error: {:}".format(e)) diff --git a/src/kpi_value_writer/service/MetricWriterToPrometheus.py b/src/kpi_value_writer/service/MetricWriterToPrometheus.py index bfbb6e3bab9770719f2fc23b3fab00e2805b074a..3238516c9bec562f9891a6a16003f9e547eef03b 100644 --- a/src/kpi_value_writer/service/MetricWriterToPrometheus.py +++ b/src/kpi_value_writer/service/MetricWriterToPrometheus.py @@ -14,15 +14,20 @@ # read Kafka stream from Kafka topic +import os import logging -from prometheus_client import Gauge -from common.proto.kpi_sample_types_pb2 import KpiSampleType -from common.proto.kpi_value_api_pb2 import KpiValue -from common.proto.kpi_manager_pb2 import KpiDescriptor +from prometheus_client import Gauge +from prometheus_client.exposition import push_to_gateway +from prometheus_client.registry import CollectorRegistry + +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.proto.kpi_value_api_pb2 import KpiValue +from common.proto.kpi_manager_pb2 import KpiDescriptor -LOGGER = logging.getLogger(__name__) -PROM_METRICS = {} +LOGGER = logging.getLogger(__name__) +PROM_METRICS = {} +GATEWAY_URL = os.getenv('PUSHGATEWAY_URL', 'prometheus-pushgateway.monitoring.svc.cluster.local:9091') class MetricWriterToPrometheus: ''' @@ -30,7 +35,9 @@ class MetricWriterToPrometheus: cooked KPI value = KpiDescriptor (gRPC message) + KpiValue (gRPC message) ''' def __init__(self): - pass + self.job_name = 'kpivaluewriter' + self.registry = CollectorRegistry() + self.gateway_url = GATEWAY_URL def merge_kpi_descriptor_and_kpi_value(self, kpi_descriptor, kpi_value): # Creating a dictionary from the kpi_descriptor's attributes @@ -44,26 +51,27 @@ class MetricWriterToPrometheus: 'slice_id' : kpi_descriptor.slice_id.slice_uuid.uuid, 'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid, 'link_id' : kpi_descriptor.link_id.link_uuid.uuid, - 'time_stamp' : kpi_value.timestamp.timestamp, - 'kpi_value' : kpi_value.kpi_value_type.floatVal + 'time_stamp' : kpi_value["time_stamp"], + 'kpi_value' : kpi_value["kpi_value"] } LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi)) return cooked_kpi def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue): # merge both gRPC messages into single varible. - cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value) + cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value) tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} - metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] # These values will be used as metric tags - metric_name = cooked_kpi['kpi_sample_type'] + metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] # These values will be used as metric tags + metric_name = cooked_kpi['kpi_sample_type'] try: if metric_name not in PROM_METRICS: # Only register the metric, when it doesn't exists PROM_METRICS[metric_name] = Gauge ( metric_name, cooked_kpi['kpi_description'], - metric_tags + metric_tags, + registry=self.registry ) - LOGGER.debug("Metric is created with labels: {:}".format(metric_tags)) + LOGGER.debug("Metric is created with labels: {:}".format(metric_tags)) PROM_METRICS[metric_name].labels( kpi_id = cooked_kpi['kpi_id'], device_id = cooked_kpi['device_id'], @@ -74,7 +82,11 @@ class MetricWriterToPrometheus: link_id = cooked_kpi['link_id'], time_stamp = cooked_kpi['time_stamp'], ).set(float(cooked_kpi['kpi_value'])) - LOGGER.debug("Metric pushed to the endpoints: {:}".format(PROM_METRICS[metric_name])) + LOGGER.debug("Metric is being pushed to the Gateway ... : {:}".format(PROM_METRICS[metric_name])) + + # Push to the Prometheus Gateway, Prometheus is preconfigured to scrap the metrics from the gateway + push_to_gateway(self.gateway_url, job=self.job_name, registry=self.registry) + LOGGER.debug("Metric pushed to Prometheus Gateway.") except ValueError as e: if 'Duplicated timeseries' in str(e): diff --git a/src/kpi_value_writer/service/__main__.py b/src/kpi_value_writer/service/__main__.py index 28ba2ac90f1e9ed28dfeeeda6b6da17568a124e7..56fc6100d391eec5953b24d882397c3ef7a2f130 100644 --- a/src/kpi_value_writer/service/__main__.py +++ b/src/kpi_value_writer/service/__main__.py @@ -13,7 +13,6 @@ # limitations under the License. import logging, signal, sys, threading -from prometheus_client import start_http_server from kpi_value_writer.service.KpiValueWriter import KpiValueWriter from common.Settings import get_log_level @@ -39,8 +38,6 @@ def main(): grpc_service = KpiValueWriter() grpc_service.start() - start_http_server(10808) - LOGGER.debug("Prometheus client is started on port 10808") # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=1.0): pass diff --git a/src/kpi_value_writer/tests/test_kpi_value_writer.py b/src/kpi_value_writer/tests/test_kpi_value_writer.py index 0d3f9e683db5430fe9214cbf4131dcc38912da85..29e81d28ae8da9f9a2602be8b36cee368ee3d87b 100755 --- a/src/kpi_value_writer/tests/test_kpi_value_writer.py +++ b/src/kpi_value_writer/tests/test_kpi_value_writer.py @@ -12,14 +12,35 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest +import time import logging from kpi_value_writer.service.KpiValueWriter import KpiValueWriter +from kpi_manager.client.KpiManagerClient import KpiManagerClient from common.tools.kafka.Variables import KafkaTopic +from test_messages import create_kpi_descriptor_request +LOGGER = logging.getLogger(__name__) +# -------- Fixtures ---------------- + +@pytest.fixture(autouse=True) +def log_all_methods(request): + ''' + This fixture logs messages before and after each test function runs, indicating the start and end of the test. + The autouse=True parameter ensures that this logging happens automatically for all tests in the module. + ''' + LOGGER.info(f" >>>>> Starting test: {request.node.name} ") + yield + LOGGER.info(f" <<<<< Finished test: {request.node.name} ") + +# @pytest.fixture(scope='module') +# def kpi_manager_client(): +# LOGGER.debug("Yielding KpiManagerClient ...") +# yield KpiManagerClient(host="10.152.183.203") +# LOGGER.debug("KpiManagerClient is terminated.") -LOGGER = logging.getLogger(__name__) # -------- Initial Test ---------------- def test_validate_kafka_topics(): @@ -27,7 +48,15 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) -def test_KafkaConsumer(): - LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ") - # kpi_value_writer = KpiValueWriter() - # kpi_value_writer.RunKafkaConsumer() +# -------------- +# NOT FOR GITHUB PIPELINE (Local testing only) +# -------------- +# def test_KafkaConsumer(kpi_manager_client): + +# # kpidescriptor = create_kpi_descriptor_request() +# # kpi_manager_client.SetKpiDescriptor(kpidescriptor) + +# kpi_value_writer = KpiValueWriter() +# kpi_value_writer.KafkaKpiConsumer() +# LOGGER.debug(" waiting for timer to finish ") +# time.sleep(300) diff --git a/src/kpi_value_writer/tests/test_messages.py b/src/kpi_value_writer/tests/test_messages.py index ffc6b398c4ff6405fe1ac8eec086553fa6fbe193..4cd901b2c8b28e13f6ff0f373d3c0de6201a4c96 100755 --- a/src/kpi_value_writer/tests/test_messages.py +++ b/src/kpi_value_writer/tests/test_messages.py @@ -25,7 +25,8 @@ def create_kpi_id_request(): def create_kpi_descriptor_request(description: str = "Test Description"): _create_kpi_request = kpi_manager_pb2.KpiDescriptor() - _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + # _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_kpi_request.kpi_id.kpi_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd" _create_kpi_request.kpi_description = description _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV4'