diff --git a/deploy/exporters.sh b/deploy/exporters.sh new file mode 100644 index 0000000000000000000000000000000000000000..6c56f25c9a3e51dbb4e38d71b149960d6c3108fe --- /dev/null +++ b/deploy/exporters.sh @@ -0,0 +1,23 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +######################################################################################################################## +# Read deployment settings +######################################################################################################################## + +# If not already set, set the namespace where Apache Kafka will be deployed. +export KFK_NAMESPACE=${KFK_NAMESPACE:-"exporters"} + +# Add instruction of exporter automatic deployment here \ No newline at end of file diff --git a/deploy/kafka.sh b/deploy/kafka.sh index f2fb666b545b86e36d7647a4e4e1de19731caa8d..4be5ef6b220a2059d642e550856580d833ab6e0e 100755 --- a/deploy/kafka.sh +++ b/deploy/kafka.sh @@ -60,10 +60,10 @@ echo ">>> Deploying Apache Kafka Broker" kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST" echo ">>> Verifing Apache Kafka deployment" -sleep 5 +sleep 10 KFK_PODS_STATUS=$(kubectl --namespace ${KFK_NAMESPACE} get pods) if echo "$KFK_PODS_STATUS" | grep -qEv 'STATUS|Running'; then - echo "Deployment Error: $KFK_PODS_STATUS" + echo "Deployment Error: \n $KFK_PODS_STATUS" else echo "$KFK_PODS_STATUS" fi \ No newline at end of file diff --git a/manifests/mock_nodeexporter.yaml b/manifests/node_exporter_deployment.yaml similarity index 100% rename from manifests/mock_nodeexporter.yaml rename to manifests/node_exporter_deployment.yaml diff --git a/manifests/mock_nodeexporterservice.yaml b/manifests/node_exporter_service.yaml similarity index 100% rename from manifests/mock_nodeexporterservice.yaml rename to manifests/node_exporter_service.yaml diff --git a/proto/telemetry_frontend.proto b/proto/telemetry_frontend.proto index 1f89a5d544dec837a7233296839b0f9eb5f4989f..48bfd7a0e17501a5fb78fa5e180b11b79fdd735d 100644 --- a/proto/telemetry_frontend.proto +++ b/proto/telemetry_frontend.proto @@ -28,13 +28,13 @@ message CollectorFilter { // All fields empty means: list all Collectors repeated CollectorId collector_id = 1; repeated kpi_manager.KpiId kpi_id = 2; - repeated kpi_sample_types.KpiSampleType kpi_sample_type = 3; - repeated context.DeviceId device_id = 4; - repeated context.EndPointId endpoint_id = 5; - repeated context.ServiceId service_id = 6; - repeated context.SliceId slice_id = 7; - repeated context.ConnectionId connection_id = 8; - repeated context.LinkId link_id = 9; + // repeated kpi_sample_types.KpiSampleType kpi_sample_type = 3; + // repeated context.DeviceId device_id = 4; + // repeated context.EndPointId endpoint_id = 5; + // repeated context.ServiceId service_id = 6; + // repeated context.SliceId slice_id = 7; + // repeated context.ConnectionId connection_id = 8; + // repeated context.LinkId link_id = 9; } message CollectorList { diff --git a/src/kpi_manager/service/KpiValueComposer.py b/src/kpi_manager/service/KpiValueComposer.py index 2710aac81aa0d8084651221bd23b9320d1329a92..38c07a22acd418a2fa91cfea90b1c813ad64a7e2 100644 --- a/src/kpi_manager/service/KpiValueComposer.py +++ b/src/kpi_manager/service/KpiValueComposer.py @@ -22,7 +22,8 @@ from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Consumer as KafkaConsumer LOGGER = logging.getLogger(__name__) -KAFKA_SERVER_IP = '127.0.0.1:9092' +KAFKA_SERVER_IP = '10.152.183.175:9092' +# KAFKA_SERVER_IP = '127.0.0.1:9092' # ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response', 'raw' : 'topic_raw' , 'labeled' : 'topic_labled'} @@ -65,7 +66,7 @@ class KpiValueComposer: try: new_event = receive_msg.value().decode('utf-8') # print("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) - LOGGER.info("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) + # LOGGER.info("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) KpiValueComposer.extract_kpi_values(new_event) except Exception as e: print(f"Error to consume event from topic: {KAFKA_TOPICS['raw']}. Error detail: {str(e)}") diff --git a/src/kpi_manager/tests/test_messages.py b/src/kpi_manager/tests/test_messages.py index db6160be5fe35b1b2b8d55b73d2ff0400d189978..83150c102be541184de220551df23ecaed4c3b41 100755 --- a/src/kpi_manager/tests/test_messages.py +++ b/src/kpi_manager/tests/test_messages.py @@ -18,7 +18,7 @@ from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.context_pb2 import DeviceId, LinkId, ServiceId, SliceId,\ ConnectionId, EndPointId -# ---------------------- New Test Messages --------------------------------- +# ---------------------- 2nd iteration Test Messages --------------------------------- def create_kpi_id_request(): _kpi_id = kpi_manager_pb2.KpiId() _kpi_id.kpi_id.uuid = "34f73604-eca6-424f-9995-18b519ad0978" diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 4cfee8dba534fef4ae9b12599d332cc0a495a54c..60cfcc6e60bbef710e2856aa6fb329347bff2036 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -30,11 +30,12 @@ from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_m LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend') -KAFKA_SERVER_IP = '127.0.0.1:9092' +# KAFKA_SERVER_IP = '127.0.0.1:9092' +KAFKA_SERVER_IP = '10.152.183.175:9092' ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response', 'raw' : 'topic_raw' , 'labeled' : 'topic_labled'} -EXPORTER_ENDPOINT = "http://127.0.0.1:9100/metrics" +EXPORTER_ENDPOINT = "http://10.152.183.2:9100/metrics" PRODUCER_CONFIG = {'bootstrap.servers': KAFKA_SERVER_IP,} @@ -68,7 +69,6 @@ class TelemetryBackendService: while True: receive_msg = consumerObj.poll(2.0) if receive_msg is None: - # print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", topic_request) # added for debugging purposes # print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", KAFKA_TOPICS['request']) # added for debugging purposes continue elif receive_msg.error(): @@ -152,6 +152,7 @@ class TelemetryBackendService: if topic not in topic_metadata.topics: # If the topic does not exist, create a new topic print(f"Topic '{topic}' does not exist. Creating...") + LOGGER.info("Topic {:} does not exist. Creating...") new_topic = NewTopic(topic, num_partitions=1, replication_factor=1) ADMIN_KAFKA_CLIENT.create_topics([new_topic]) except KafkaException as e: @@ -229,7 +230,8 @@ class TelemetryBackendService: try: while True: response = requests.get(EXPORTER_ENDPOINT) - LOGGER.info("Response Status {:} ".format(response)) + # print("Response Status {:} ".format(response)) + # LOGGER.info("Response Status {:} ".format(response)) try: if response.status_code == 200: producerObj = KafkaProducer(PRODUCER_CONFIG) diff --git a/src/telemetry/backend/tests/testTelemetryBackend.py b/src/telemetry/backend/tests/testTelemetryBackend.py index 7c3b7497bf2b94ba350a0b193fbf76e49a67bcec..e81e98473b36b31687b67d68bd51e96fbe6e4613 100644 --- a/src/telemetry/backend/tests/testTelemetryBackend.py +++ b/src/telemetry/backend/tests/testTelemetryBackend.py @@ -18,7 +18,7 @@ sys.path.append('/home/tfs/tfs-ctrl') import threading import logging from typing import Tuple -from common.proto.context_pb2 import Empty +# from common.proto.context_pb2 import Empty from src.telemetry.backend.service.TelemetryBackendService import TelemetryBackendService LOGGER = logging.getLogger(__name__) @@ -31,7 +31,7 @@ LOGGER = logging.getLogger(__name__) # def test_verify_kafka_topics(): # LOGGER.info('test_verify_kafka_topics requesting') # TelemetryBackendServiceObj = TelemetryBackendService() -# KafkaTopics = ['topic_request', 'topic_response'] +# KafkaTopics = ['topic_request', 'topic_response', 'topic_raw', 'topic_labled'] # response = TelemetryBackendServiceObj.create_topic_if_not_exists(KafkaTopics) # LOGGER.debug(str(response)) # assert isinstance(response, bool)