From 6a191bd306bfea36ac63a55a3d10da41bf28ce92 Mon Sep 17 00:00:00 2001 From: Waleed Akbar <wakbar@cttc.es> Date: Wed, 5 Jun 2024 14:53:34 +0000 Subject: [PATCH] after VM crash recovery. composer is able to mtach and read KPIs --- deploy/exporters.sh | 23 +++++++++++++++++++ deploy/kafka.sh | 4 ++-- ...ter.yaml => node_exporter_deployment.yaml} | 0 ...ervice.yaml => node_exporter_service.yaml} | 0 proto/telemetry_frontend.proto | 14 +++++------ src/kpi_manager/service/KpiValueComposer.py | 5 ++-- src/kpi_manager/tests/test_messages.py | 2 +- .../service/TelemetryBackendService.py | 10 ++++---- .../backend/tests/testTelemetryBackend.py | 4 ++-- 9 files changed, 44 insertions(+), 18 deletions(-) create mode 100644 deploy/exporters.sh rename manifests/{mock_nodeexporter.yaml => node_exporter_deployment.yaml} (100%) rename manifests/{mock_nodeexporterservice.yaml => node_exporter_service.yaml} (100%) diff --git a/deploy/exporters.sh b/deploy/exporters.sh new file mode 100644 index 000000000..6c56f25c9 --- /dev/null +++ b/deploy/exporters.sh @@ -0,0 +1,23 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +######################################################################################################################## +# Read deployment settings +######################################################################################################################## + +# If not already set, set the namespace where Apache Kafka will be deployed. +export KFK_NAMESPACE=${KFK_NAMESPACE:-"exporters"} + +# Add instruction of exporter automatic deployment here \ No newline at end of file diff --git a/deploy/kafka.sh b/deploy/kafka.sh index f2fb666b5..4be5ef6b2 100755 --- a/deploy/kafka.sh +++ b/deploy/kafka.sh @@ -60,10 +60,10 @@ echo ">>> Deploying Apache Kafka Broker" kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST" echo ">>> Verifing Apache Kafka deployment" -sleep 5 +sleep 10 KFK_PODS_STATUS=$(kubectl --namespace ${KFK_NAMESPACE} get pods) if echo "$KFK_PODS_STATUS" | grep -qEv 'STATUS|Running'; then - echo "Deployment Error: $KFK_PODS_STATUS" + echo "Deployment Error: \n $KFK_PODS_STATUS" else echo "$KFK_PODS_STATUS" fi \ No newline at end of file diff --git a/manifests/mock_nodeexporter.yaml b/manifests/node_exporter_deployment.yaml similarity index 100% rename from manifests/mock_nodeexporter.yaml rename to manifests/node_exporter_deployment.yaml diff --git a/manifests/mock_nodeexporterservice.yaml b/manifests/node_exporter_service.yaml similarity index 100% rename from manifests/mock_nodeexporterservice.yaml rename to manifests/node_exporter_service.yaml diff --git a/proto/telemetry_frontend.proto b/proto/telemetry_frontend.proto index 1f89a5d54..48bfd7a0e 100644 --- a/proto/telemetry_frontend.proto +++ b/proto/telemetry_frontend.proto @@ -28,13 +28,13 @@ message CollectorFilter { // All fields empty means: list all Collectors repeated CollectorId collector_id = 1; repeated kpi_manager.KpiId kpi_id = 2; - repeated kpi_sample_types.KpiSampleType kpi_sample_type = 3; - repeated context.DeviceId device_id = 4; - repeated context.EndPointId endpoint_id = 5; - repeated context.ServiceId service_id = 6; - repeated context.SliceId slice_id = 7; - repeated context.ConnectionId connection_id = 8; - repeated context.LinkId link_id = 9; + // repeated kpi_sample_types.KpiSampleType kpi_sample_type = 3; + // repeated context.DeviceId device_id = 4; + // repeated context.EndPointId endpoint_id = 5; + // repeated context.ServiceId service_id = 6; + // repeated context.SliceId slice_id = 7; + // repeated context.ConnectionId connection_id = 8; + // repeated context.LinkId link_id = 9; } message CollectorList { diff --git a/src/kpi_manager/service/KpiValueComposer.py b/src/kpi_manager/service/KpiValueComposer.py index 2710aac81..38c07a22a 100644 --- a/src/kpi_manager/service/KpiValueComposer.py +++ b/src/kpi_manager/service/KpiValueComposer.py @@ -22,7 +22,8 @@ from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Consumer as KafkaConsumer LOGGER = logging.getLogger(__name__) -KAFKA_SERVER_IP = '127.0.0.1:9092' +KAFKA_SERVER_IP = '10.152.183.175:9092' +# KAFKA_SERVER_IP = '127.0.0.1:9092' # ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response', 'raw' : 'topic_raw' , 'labeled' : 'topic_labled'} @@ -65,7 +66,7 @@ class KpiValueComposer: try: new_event = receive_msg.value().decode('utf-8') # print("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) - LOGGER.info("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) + # LOGGER.info("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) KpiValueComposer.extract_kpi_values(new_event) except Exception as e: print(f"Error to consume event from topic: {KAFKA_TOPICS['raw']}. Error detail: {str(e)}") diff --git a/src/kpi_manager/tests/test_messages.py b/src/kpi_manager/tests/test_messages.py index db6160be5..83150c102 100755 --- a/src/kpi_manager/tests/test_messages.py +++ b/src/kpi_manager/tests/test_messages.py @@ -18,7 +18,7 @@ from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.context_pb2 import DeviceId, LinkId, ServiceId, SliceId,\ ConnectionId, EndPointId -# ---------------------- New Test Messages --------------------------------- +# ---------------------- 2nd iteration Test Messages --------------------------------- def create_kpi_id_request(): _kpi_id = kpi_manager_pb2.KpiId() _kpi_id.kpi_id.uuid = "34f73604-eca6-424f-9995-18b519ad0978" diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 4cfee8dba..60cfcc6e6 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -30,11 +30,12 @@ from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_m LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend') -KAFKA_SERVER_IP = '127.0.0.1:9092' +# KAFKA_SERVER_IP = '127.0.0.1:9092' +KAFKA_SERVER_IP = '10.152.183.175:9092' ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response', 'raw' : 'topic_raw' , 'labeled' : 'topic_labled'} -EXPORTER_ENDPOINT = "http://127.0.0.1:9100/metrics" +EXPORTER_ENDPOINT = "http://10.152.183.2:9100/metrics" PRODUCER_CONFIG = {'bootstrap.servers': KAFKA_SERVER_IP,} @@ -68,7 +69,6 @@ class TelemetryBackendService: while True: receive_msg = consumerObj.poll(2.0) if receive_msg is None: - # print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", topic_request) # added for debugging purposes # print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", KAFKA_TOPICS['request']) # added for debugging purposes continue elif receive_msg.error(): @@ -152,6 +152,7 @@ class TelemetryBackendService: if topic not in topic_metadata.topics: # If the topic does not exist, create a new topic print(f"Topic '{topic}' does not exist. Creating...") + LOGGER.info("Topic {:} does not exist. Creating...") new_topic = NewTopic(topic, num_partitions=1, replication_factor=1) ADMIN_KAFKA_CLIENT.create_topics([new_topic]) except KafkaException as e: @@ -229,7 +230,8 @@ class TelemetryBackendService: try: while True: response = requests.get(EXPORTER_ENDPOINT) - LOGGER.info("Response Status {:} ".format(response)) + # print("Response Status {:} ".format(response)) + # LOGGER.info("Response Status {:} ".format(response)) try: if response.status_code == 200: producerObj = KafkaProducer(PRODUCER_CONFIG) diff --git a/src/telemetry/backend/tests/testTelemetryBackend.py b/src/telemetry/backend/tests/testTelemetryBackend.py index 7c3b7497b..e81e98473 100644 --- a/src/telemetry/backend/tests/testTelemetryBackend.py +++ b/src/telemetry/backend/tests/testTelemetryBackend.py @@ -18,7 +18,7 @@ sys.path.append('/home/tfs/tfs-ctrl') import threading import logging from typing import Tuple -from common.proto.context_pb2 import Empty +# from common.proto.context_pb2 import Empty from src.telemetry.backend.service.TelemetryBackendService import TelemetryBackendService LOGGER = logging.getLogger(__name__) @@ -31,7 +31,7 @@ LOGGER = logging.getLogger(__name__) # def test_verify_kafka_topics(): # LOGGER.info('test_verify_kafka_topics requesting') # TelemetryBackendServiceObj = TelemetryBackendService() -# KafkaTopics = ['topic_request', 'topic_response'] +# KafkaTopics = ['topic_request', 'topic_response', 'topic_raw', 'topic_labled'] # response = TelemetryBackendServiceObj.create_topic_if_not_exists(KafkaTopics) # LOGGER.debug(str(response)) # assert isinstance(response, bool) -- GitLab