From fca7587e762136aeb7c7103ee8c93057e5d5ee35 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Fri, 26 Jul 2024 13:56:17 +0000 Subject: [PATCH 1/3] Multiple bug Fixes for KPI Management. - fixes deployment of KPI Writer and API --- deploy/tfs.sh | 4 +- scripts/run_tests_locally-kpi-DB.sh | 2 + scripts/run_tests_locally-kpi-manager.sh | 2 + scripts/run_tests_locally-kpi-prom-writer.sh | 2 + src/kpi_manager/database/KpiEngine.py | 4 +- src/kpi_manager/database/Kpi_DB.py | 6 +- .../service/KpiManagerServiceServicerImpl.py | 9 +- src/kpi_value_api/service/__main__.py | 1 - .../service/KpiValueWriter.py | 29 +++-- src/kpi_value_writer/service/KpiWriterOld.py | 108 ------------------ 10 files changed, 32 insertions(+), 135 deletions(-) delete mode 100644 src/kpi_value_writer/service/KpiWriterOld.py diff --git a/deploy/tfs.sh b/deploy/tfs.sh index f61cdb991..62f36a2c1 100755 --- a/deploy/tfs.sh +++ b/deploy/tfs.sh @@ -340,7 +340,7 @@ for COMPONENT in $TFS_COMPONENTS; do echo " Deploying '$COMPONENT' component to Kubernetes..." DEPLOY_LOG="$TMP_LOGS_FOLDER/deploy_${COMPONENT}.log" kubectl --namespace $TFS_K8S_NAMESPACE apply -f "$MANIFEST" > "$DEPLOY_LOG" - COMPONENT_OBJNAME=$(echo "${COMPONENT}" | sed "s/\_/-/") + COMPONENT_OBJNAME=$(echo "${COMPONENT}" | sed "s/\_/-/g") #kubectl --namespace $TFS_K8S_NAMESPACE scale deployment --replicas=0 ${COMPONENT_OBJNAME}service >> "$DEPLOY_LOG" #kubectl --namespace $TFS_K8S_NAMESPACE scale deployment --replicas=1 ${COMPONENT_OBJNAME}service >> "$DEPLOY_LOG" @@ -391,7 +391,7 @@ printf "\n" for COMPONENT in $TFS_COMPONENTS; do echo "Waiting for '$COMPONENT' component..." - COMPONENT_OBJNAME=$(echo "${COMPONENT}" | sed "s/\_/-/") + COMPONENT_OBJNAME=$(echo "${COMPONENT}" | sed "s/\_/-/g") kubectl wait --namespace $TFS_K8S_NAMESPACE \ --for='condition=available' --timeout=90s deployment/${COMPONENT_OBJNAME}service WAIT_EXIT_CODE=$? diff --git a/scripts/run_tests_locally-kpi-DB.sh b/scripts/run_tests_locally-kpi-DB.sh index d43be66e1..4953b49e0 100755 --- a/scripts/run_tests_locally-kpi-DB.sh +++ b/scripts/run_tests_locally-kpi-DB.sh @@ -24,5 +24,7 @@ cd $PROJECTDIR/src # python3 kpi_manager/tests/test_unitary.py RCFILE=$PROJECTDIR/coverage/.coveragerc +CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}') +export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ kpi_manager/tests/test_kpi_db.py diff --git a/scripts/run_tests_locally-kpi-manager.sh b/scripts/run_tests_locally-kpi-manager.sh index db6e78683..a6a24f90d 100755 --- a/scripts/run_tests_locally-kpi-manager.sh +++ b/scripts/run_tests_locally-kpi-manager.sh @@ -24,5 +24,7 @@ cd $PROJECTDIR/src # python3 kpi_manager/tests/test_unitary.py RCFILE=$PROJECTDIR/coverage/.coveragerc +CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}') +export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ kpi_manager/tests/test_kpi_manager.py diff --git a/scripts/run_tests_locally-kpi-prom-writer.sh b/scripts/run_tests_locally-kpi-prom-writer.sh index 1179cbf86..8865a8a34 100755 --- a/scripts/run_tests_locally-kpi-prom-writer.sh +++ b/scripts/run_tests_locally-kpi-prom-writer.sh @@ -19,5 +19,7 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc +CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}') +export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ kpi_value_writer/tests/test_metric_writer_to_prom.py diff --git a/src/kpi_manager/database/KpiEngine.py b/src/kpi_manager/database/KpiEngine.py index 42bda9527..dff406de6 100644 --- a/src/kpi_manager/database/KpiEngine.py +++ b/src/kpi_manager/database/KpiEngine.py @@ -27,11 +27,11 @@ class KpiEngine: if crdb_uri is None: CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE') CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT') - CRDB_DATABASE = get_setting('CRDB_DATABASE') + CRDB_DATABASE = 'tfs_kpi_mgmt' # TODO: define variable get_setting('CRDB_DATABASE_KPI_MGMT') CRDB_USERNAME = get_setting('CRDB_USERNAME') CRDB_PASSWORD = get_setting('CRDB_PASSWORD') CRDB_SSLMODE = get_setting('CRDB_SSLMODE') - crdb_uri = CRDB_URI_TEMPLATE.format( + crdb_uri = CRDB_URI_TEMPLATE.format( CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) # crdb_uri = CRDB_URI_TEMPLATE.format( # CRDB_USERNAME, CRDB_PASSWORD, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) diff --git a/src/kpi_manager/database/Kpi_DB.py b/src/kpi_manager/database/Kpi_DB.py index 5b2b586b6..dcd28489b 100644 --- a/src/kpi_manager/database/Kpi_DB.py +++ b/src/kpi_manager/database/Kpi_DB.py @@ -18,10 +18,10 @@ from sqlalchemy.orm import sessionmaker from kpi_manager.database.KpiEngine import KpiEngine from kpi_manager.database.KpiModel import Kpi as KpiModel from common.method_wrappers.ServiceExceptions import ( - AlreadyExistsException, OperationFailedException) + AlreadyExistsException, OperationFailedException , NotFoundException) LOGGER = logging.getLogger(__name__) -DB_NAME = "kpi" +DB_NAME = "tfs_kpi_mgmt" class KpiDB: def __init__(self): @@ -86,7 +86,7 @@ class KpiDB: return entity else: LOGGER.debug(f"{model.__name__} ID not found: {str(id_to_search)}") - return None + raise NotFoundException (model.__name__, id_to_search, extra_details=["Row not found with ID"] ) except Exception as e: session.rollback() LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}") diff --git a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py index 05292fc5b..bea2c78b4 100644 --- a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py +++ b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py @@ -52,13 +52,8 @@ class KpiManagerServiceServicerImpl(KpiManagerServiceServicer): try: kpi_id_to_search = request.kpi_id.uuid row = self.kpi_db_obj.search_db_row_by_id(KpiModel, 'kpi_id', kpi_id_to_search) - if row is not None: - response = KpiModel.convert_row_to_KpiDescriptor(row) - return response - if row is None: - print ('No matching row found for kpi id: {:}'.format(kpi_id_to_search)) - LOGGER.info('No matching row found kpi id: {:}'.format(kpi_id_to_search)) - return Empty() + response = KpiModel.convert_row_to_KpiDescriptor(row) + return response except Exception as e: print ('Unable to search kpi id. {:}'.format(e)) LOGGER.info('Unable to search kpi id. {:}'.format(e)) diff --git a/src/kpi_value_api/service/__main__.py b/src/kpi_value_api/service/__main__.py index 8b4ebe296..f0f265a48 100644 --- a/src/kpi_value_api/service/__main__.py +++ b/src/kpi_value_api/service/__main__.py @@ -13,7 +13,6 @@ # limitations under the License. import logging, signal, sys, threading -from prometheus_client import start_http_server from common.Settings import get_log_level from .KpiValueApiService import KpiValueApiService diff --git a/src/kpi_value_writer/service/KpiValueWriter.py b/src/kpi_value_writer/service/KpiValueWriter.py index a4b10ed63..26bab4465 100644 --- a/src/kpi_value_writer/service/KpiValueWriter.py +++ b/src/kpi_value_writer/service/KpiValueWriter.py @@ -17,20 +17,29 @@ import threading from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.proto.kpi_value_api_pb2 import KpiValue from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId +from common.Settings import get_service_port_grpc +from common.Constants import ServiceNameEnum +from common.tools.service.GenericGrpcService import GenericGrpcService + from confluent_kafka import KafkaError from confluent_kafka import Consumer as KafkaConsumer from kpi_manager.client.KpiManagerClient import KpiManagerClient # -- test import -- -from kpi_value_writer.tests.test_messages import create_kpi_descriptor_request +# from kpi_value_writer.tests.test_messages import create_kpi_descriptor_request from .MetricWriterToPrometheus import MetricWriterToPrometheus LOGGER = logging.getLogger(__name__) ACTIVE_CONSUMERS = [] +METRIC_WRITER = MetricWriterToPrometheus() + +class KpiValueWriter(GenericGrpcService): + def __init__(self, cls_name : str = __name__) -> None: + port = get_service_port_grpc(ServiceNameEnum.KPIVALUEWRITER) + super().__init__(port, cls_name=cls_name) -class KpiValueWriter: @staticmethod def RunKafkaConsumer(): thread = threading.Thread(target=KpiValueWriter.KafkaConsumer, args=()) @@ -44,11 +53,7 @@ class KpiValueWriter: 'group.id' : __class__, 'auto.offset.reset' : 'latest'} ) - - metric_writer_to_prom = MetricWriterToPrometheus() kpi_manager_client = KpiManagerClient() - print("Kpi manger client created: {:}".format(kpi_manager_client)) - kafka_consumer.subscribe([KafkaTopic.VALUE.value]) LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) @@ -84,15 +89,15 @@ class KpiValueWriter: try: kpi_descriptor_object = KpiDescriptor() kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) - - print("kpi descriptor received: {:}".format(kpi_descriptor_object)) - if isinstance (kpi_descriptor_object, KpiDescriptor): + if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid: + # print("kpi descriptor received: {:}".format(kpi_descriptor_object)) + # if isinstance (kpi_descriptor_object, KpiDescriptor): LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) - MetricWriterToPrometheus.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value) + METRIC_WRITER.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value) else: - LOGGER.info("Error in extracting KpiDescriptor {:}".format(kpi_descriptor_object)) - print("Error in extracting KpiDescriptor {:}".format(kpi_descriptor_object)) + LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) + print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) except Exception as e: LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e)) print ("Unable to get KpiDescriptor. Error: {:}".format(e)) diff --git a/src/kpi_value_writer/service/KpiWriterOld.py b/src/kpi_value_writer/service/KpiWriterOld.py deleted file mode 100644 index b9a4316b0..000000000 --- a/src/kpi_value_writer/service/KpiWriterOld.py +++ /dev/null @@ -1,108 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# read Kafka stream from Kafka topic - -import ast -import time -import threading -from confluent_kafka import KafkaError -from prometheus_client import start_http_server, Gauge, CollectorRegistry -from confluent_kafka import Consumer as KafkaConsumer - -KAFKA_SERVER_IP = '127.0.0.1:9092' -KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response', - 'raw' : 'topic_raw' , 'labeled' : 'topic_labeled'} -CONSUMER_CONFIG = {'bootstrap.servers' : KAFKA_SERVER_IP, - 'group.id' : 'kpi_writer', - 'auto.offset.reset' : 'latest'} -KPIs_TO_SEARCH = ["node_network_receive_packets_total", - "node_network_receive_bytes_total", - "node_network_transmit_bytes_total", - "process_open_fds"] -PROM_METRICS = {} -KAFKA_REGISTERY = CollectorRegistry() - -class KpiWriter: - def __init__(self) -> None: - pass - - @staticmethod - def kpi_writer(): - KpiWriter.create_prom_metrics_name() - threading.Thread(target=KpiWriter.kafka_listener, args=()).start() - - @staticmethod - def kafka_listener(): - """ - listener for events on Kafka topic. - """ - # Start up the server to expose the metrics at port number mention below. - start_http_server(8101, registry=KAFKA_REGISTERY) - kafka_consumer = KafkaConsumer(CONSUMER_CONFIG) - kafka_consumer.subscribe([KAFKA_TOPICS['labeled']]) - while True: - receive_msg = kafka_consumer.poll(2.0) - if receive_msg is None: - # print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['raw']) # added for debugging purposes - continue - elif receive_msg.error(): - if receive_msg.error().code() == KafkaError._PARTITION_EOF: - continue - else: - print("Consumer error: {}".format(receive_msg.error())) - continue - try: - new_event = receive_msg.value().decode('utf-8') - # print("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) - # LOGGER.info("New event on topic '{:}' is {:}".format(KAFKA_TOPICS['raw'], new_event)) - KpiWriter.write_metric_to_promtheus(new_event) - except Exception as e: - print(f"Error to consume event from topic: {KAFKA_TOPICS['labeled']}. Error detail: {str(e)}") - continue - - # send metric to Prometheus - @staticmethod - def write_metric_to_promtheus(event): - event = ast.literal_eval(event) # converted into dict - print("New recevied event: {:}".format(event)) - event_kpi_name = event['kpi_description'] - if event_kpi_name in KPIs_TO_SEARCH: - PROM_METRICS[event_kpi_name].labels( - kpi_id = event['kpi_id'], - kpi_sample_type = event['kpi_sample_type'], - device_id = event['device_id'], - endpoint_id = event['endpoint_id'], - service_id = event['service_id'], - slice_id = event['slice_id'], - connection_id = event['connection_id'], - link_id = event['link_id'] - ).set(float(event['kpi_value'])) - time.sleep(0.05) - - @staticmethod - def create_prom_metrics_name(): - metric_tags = ['kpi_id','kpi_sample_type','device_id', - 'endpoint_id','service_id','slice_id','connection_id','link_id'] - for metric_key in KPIs_TO_SEARCH: - metric_name = metric_key - metric_description = "description of " + str(metric_key) - try: - PROM_METRICS[metric_key] = Gauge ( - metric_name, metric_description, metric_tags, - registry=KAFKA_REGISTERY ) - # print("Metric pushed to Prometheus: {:}".format(PROM_METRICS[metric_key])) - except ValueError as e: - if 'Duplicated timeseries' in str(e): - print("Metric {:} is already registered. Skipping.".format(metric_name)) -- GitLab From 2ea824bf0eb18090afdcc9b14ff1ac48d03819e3 Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Fri, 26 Jul 2024 14:02:52 +0000 Subject: [PATCH 2/3] KPI Management: - Activated CI/CD build and unitary test --- .gitlab-ci.yml | 1 + src/kpi_manager/.gitlab-ci.yml | 2 -- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index e2d653e03..59e0f0043 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -45,6 +45,7 @@ include: #- local: '/src/dlt/.gitlab-ci.yml' - local: '/src/load_generator/.gitlab-ci.yml' - local: '/src/bgpls_speaker/.gitlab-ci.yml' + - local: '/src/kpi_manager/.gitlab-ci.yml' # This should be last one: end-to-end integration tests - local: '/src/tests/.gitlab-ci.yml' diff --git a/src/kpi_manager/.gitlab-ci.yml b/src/kpi_manager/.gitlab-ci.yml index 6aef328ea..7d3870036 100644 --- a/src/kpi_manager/.gitlab-ci.yml +++ b/src/kpi_manager/.gitlab-ci.yml @@ -68,8 +68,6 @@ unit_test kpi-manager: - docker ps -a - CRDB_ADDRESS=$(docker inspect crdb --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}") - echo $CRDB_ADDRESS - - NATS_ADDRESS=$(docker inspect nats --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}") - - echo $NATS_ADDRESS - > docker run --name $IMAGE_NAME -d -p 30010:30010 --env "CRDB_URI=cockroachdb://tfs:tfs123@${CRDB_ADDRESS}:26257/tfs_test?sslmode=require" -- GitLab From c5e197135fd975e0f98533695ab662a132b66f53 Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Fri, 26 Jul 2024 14:34:52 +0000 Subject: [PATCH 3/3] KPI Management: - Activated CI/CD build and unitary test --- .gitlab-ci.yml | 2 ++ src/kpi_manager/.gitlab-ci.yml | 4 ++-- src/kpi_value_api/.gitlab-ci.yml | 4 ++-- src/kpi_value_writer/.gitlab-ci.yml | 4 ++-- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 59e0f0043..0c5ff9325 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -46,6 +46,8 @@ include: - local: '/src/load_generator/.gitlab-ci.yml' - local: '/src/bgpls_speaker/.gitlab-ci.yml' - local: '/src/kpi_manager/.gitlab-ci.yml' + - local: '/src/kpi_value_api/.gitlab-ci.yml' + - local: '/src/kpi_value_writer/.gitlab-ci.yml' # This should be last one: end-to-end integration tests - local: '/src/tests/.gitlab-ci.yml' diff --git a/src/kpi_manager/.gitlab-ci.yml b/src/kpi_manager/.gitlab-ci.yml index 7d3870036..498cfd89f 100644 --- a/src/kpi_manager/.gitlab-ci.yml +++ b/src/kpi_manager/.gitlab-ci.yml @@ -15,7 +15,7 @@ # Build, tag, and push the Docker image to the GitLab Docker registry build kpi-manager: variables: - IMAGE_NAME: 'kpi-manager' # name of the microservice + IMAGE_NAME: 'kpi_manager' # name of the microservice IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) stage: build before_script: @@ -41,7 +41,7 @@ build kpi-manager: # Apply unit test to the component unit_test kpi-manager: variables: - IMAGE_NAME: 'kpi-manager' # name of the microservice + IMAGE_NAME: 'kpi_manager' # name of the microservice IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) stage: unit_test needs: diff --git a/src/kpi_value_api/.gitlab-ci.yml b/src/kpi_value_api/.gitlab-ci.yml index c9107abaa..166e9d3cb 100644 --- a/src/kpi_value_api/.gitlab-ci.yml +++ b/src/kpi_value_api/.gitlab-ci.yml @@ -15,7 +15,7 @@ # Build, tag, and push the Docker image to the GitLab Docker registry build kpi-value-api: variables: - IMAGE_NAME: 'kpi-value-api' # name of the microservice + IMAGE_NAME: 'kpi_value_api' # name of the microservice IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) stage: build before_script: @@ -41,7 +41,7 @@ build kpi-value-api: # Apply unit test to the component unit_test kpi-value-api: variables: - IMAGE_NAME: 'kpi-value-api' # name of the microservice + IMAGE_NAME: 'kpi_value_api' # name of the microservice IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) stage: unit_test needs: diff --git a/src/kpi_value_writer/.gitlab-ci.yml b/src/kpi_value_writer/.gitlab-ci.yml index 52b1b8fe6..25619ce7f 100644 --- a/src/kpi_value_writer/.gitlab-ci.yml +++ b/src/kpi_value_writer/.gitlab-ci.yml @@ -15,7 +15,7 @@ # Build, tag, and push the Docker image to the GitLab Docker registry build kpi-value-writer: variables: - IMAGE_NAME: 'kpi-value-writer' # name of the microservice + IMAGE_NAME: 'kpi_value_writer' # name of the microservice IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) stage: build before_script: @@ -41,7 +41,7 @@ build kpi-value-writer: # Apply unit test to the component unit_test kpi-value-writer: variables: - IMAGE_NAME: 'kpi-value-writer' # name of the microservice + IMAGE_NAME: 'kpi_value_writer' # name of the microservice IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) stage: unit_test needs: -- GitLab