diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index afe1ee67be2edbeadb41aeb61d3ff24d3aa73576..ba58e31eff4b62ba3a9b0d209c935381c6badf36 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -12,10 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging from enum import Enum +from confluent_kafka import KafkaException +from confluent_kafka.admin import AdminClient, NewTopic + + +LOGGER = logging.getLogger(__name__) class KafkaConfig(Enum): - SERVER_IP = "127.0.0.1:9092" + SERVER_IP = "127.0.0.1:9092" + ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_IP}) class KafkaTopic(Enum): REQUEST = 'topic_request' @@ -24,4 +31,42 @@ class KafkaTopic(Enum): LABELED = 'topic_labeled' VALUE = 'topic_value' + @staticmethod + def create_all_topics() -> bool: + """ + Method to create Kafka topics defined as class members + """ + # LOGGER.debug("Topics to be created: {:}".format(KafkaTopic.__members__.values())) + # LOGGER.debug("Topics to be created: {:}".format(KafkaTopic.__members__.keys())) + # LOGGER.debug("Topics to be created: {:}".format([member.value for member in KafkaTopic])) + all_topics = [member.value for member in KafkaTopic] + if( KafkaTopic.create_new_topic_if_not_exists( all_topics )): + LOGGER.debug("All topics created sucsessfully") + return True + else: + LOGGER.debug("Error creating all topics") + return False + + @staticmethod + def create_new_topic_if_not_exists(new_topics: list) -> bool: + """ + Method to create Kafka topic if it does not exist. + Args: + list of topic: containing the topic name(s) to be created on Kafka + """ + LOGGER.debug("Recevied topic List: {:}".format(new_topics)) + for topic in new_topics: + try: + topic_metadata = KafkaConfig.ADMIN_CLIENT.value.list_topics(timeout=5) + if topic not in topic_metadata.topics: + # If the topic does not exist, create a new topic + print(f"Topic '{topic}' does not exist. Creating...") + LOGGER.debug("Topic {:} does not exist. Creating...".format(topic)) + new_topic = NewTopic(topic, num_partitions=1, replication_factor=1) + KafkaConfig.ADMIN_CLIENT.value.create_topics([new_topic]) + except Exception as e: + LOGGER.debug("Failed to create topic: {:}".format(e)) + return False + return True + # create all topics after the deployments (Telemetry and Analytics) \ No newline at end of file diff --git a/src/kpi_manager/service/__main__.py b/src/kpi_manager/service/__main__.py index 9085bc4683ba159bb64043e7b82173442f0a5bdd..9dd0f97cf91b887da851e8a088959c1da4f6fed8 100644 --- a/src/kpi_manager/service/__main__.py +++ b/src/kpi_manager/service/__main__.py @@ -31,7 +31,7 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name terminate.set() def start_kpi_manager(name_mapping : NameMapping): - LOGGER.info('Start Monitoring...',) + LOGGER.info('Start Kpi Manager...',) events_collector = EventsDeviceCollector(name_mapping) events_collector.start() diff --git a/src/kpi_manager/tests/test_kpi_manager.py b/src/kpi_manager/tests/test_kpi_manager.py index 2f475cc0fdb03ad1e9da2b0614d5a2873d914923..968eafbfe4002b9a538047bad1954794afe8e68c 100755 --- a/src/kpi_manager/tests/test_kpi_manager.py +++ b/src/kpi_manager/tests/test_kpi_manager.py @@ -211,14 +211,14 @@ def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disab # LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response)) # assert isinstance(del_response, Empty) -# def test_GetKpiDescriptor(kpi_manager_client): -# LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ") -# # adding KPI -# response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) -# # get KPI -# response = kpi_manager_client.GetKpiDescriptor(response_id) -# LOGGER.info("Response gRPC message object: {:}".format(response)) -# assert isinstance(response, KpiDescriptor) +def test_GetKpiDescriptor(kpi_manager_client): + LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ") + # adding KPI + response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) + # get KPI + response = kpi_manager_client.GetKpiDescriptor(response_id) + LOGGER.info("Response gRPC message object: {:}".format(response)) + assert isinstance(response, KpiDescriptor) # def test_SelectKpiDescriptor(kpi_manager_client): # LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ") @@ -229,12 +229,12 @@ def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disab # LOGGER.info("Response gRPC message object: {:}".format(response)) # assert isinstance(response, KpiDescriptorList) -def test_set_list_of_KPIs(kpi_manager_client): - LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") - KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"] - # adding KPI - for kpi in KPIs_TO_SEARCH: - kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi)) +# def test_set_list_of_KPIs(kpi_manager_client): +# LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") +# KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"] +# # adding KPI +# for kpi in KPIs_TO_SEARCH: +# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi)) # ---------- 2nd Iteration Tests ----------------- diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index e0b8b550a6f6ad0a7d1018e69705eee26180d241..ce1dd128280f12c761f4be5b9ebd7d811cfdc719 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, grpc -from typing import Tuple, Any +import logging, grpc, json +from typing import Tuple, Any, List, Dict from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.tools.kafka.Variables import KafkaConfig, KafkaTopic @@ -37,22 +37,30 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext ) -> Empty: LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request)) - producer_obj = KafkaProducer({'bootstrap.servers' : KafkaConfig.SERVER_IP.value}) + + producer_obj = KafkaProducer({ + 'bootstrap.servers' : KafkaConfig.SERVER_IP.value + }) + for kpi_value in request.kpi_value_list: kpi_value_to_produce : Tuple [str, Any, Any] = ( kpi_value.kpi_id.kpi_id, # kpi_value.kpi_id.kpi_id.uuid kpi_value.timestamp, # kpi_value.timestamp.timestamp - kpi_value.kpi_value_type # kpi_value.kpi_value_type.(many options) + kpi_value.kpi_value_type # kpi_value.kpi_value_type.(many options) how? ) LOGGER.debug('KPI to produce is {:}'.format(kpi_value_to_produce)) msg_key = "gRPC-KpiValueApi" # str(__class__.__name__) # write this KPI to Kafka - producer_obj.produce(KafkaTopic.VALUE.value, - key = msg_key, - value = str(kpi_value_to_produce), - callback = self.delivery_callback - ) + + producer_obj.produce( + KafkaTopic.VALUE.value, + key = msg_key, + # value = json.dumps(kpi_value_to_produce), + value = kpi_value.SerializeToString(), + callback = self.delivery_callback + ) producer_obj.flush() + return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @@ -61,5 +69,5 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): LOGGER.debug('SelectKpiValues: Received gRPC message object: {:}'.format(request)) def delivery_callback(self, err, msg): - if err: print(f'Message delivery failed: {err}') - else: print(f'Message delivered to topic {msg.topic()}') + if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) + else: print('Message delivered to topic {:}'.format(msg.topic())) diff --git a/src/kpi_value_api/tests/test_kpi_value_api.py b/src/kpi_value_api/tests/test_kpi_value_api.py index bfa9485a8aa322866f4ec46a5b8ed82758ee900e..519a61704c6e5d4ce27d9dc3be45e8ffe5b288cb 100644 --- a/src/kpi_value_api/tests/test_kpi_value_api.py +++ b/src/kpi_value_api/tests/test_kpi_value_api.py @@ -17,14 +17,17 @@ import os, logging, pytest from common.proto.context_pb2 import Empty from common.Constants import ServiceNameEnum +from common.tools.kafka.Variables import KafkaTopic from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc) + from kpi_value_api.service.NameMapping import NameMapping from kpi_value_api.service.KpiValueApiService import KpiValueApiService from kpi_value_api.client.KpiValueApiClient import KpiValueApiClient from kpi_value_api.tests.messages import create_kpi_value_list + LOCAL_HOST = '127.0.0.1' KPIVALUEAPI_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI) # type: ignore @@ -77,6 +80,12 @@ def kpi_value_api_client(kpi_value_api_service : KpiValueApiService ): # Tests Implementation of Kpi Value Api ########################### +def test_validate_kafka_topics(): + LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") + response = KafkaTopic.create_all_topics() + assert isinstance(response, bool) + + def test_store_kpi_values(kpi_value_api_client): LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") response = kpi_value_api_client.StoreKpiValues(create_kpi_value_list()) diff --git a/src/kpi_value_writer/service/KpiValueComposer.py b/src/kpi_value_writer/service/KpiValueComposer.py index 61e55812108023524c2e6fd60fc8a2b4c4968115..e2f315eda124e6a2d31208ff9194e8600694ceff 100644 --- a/src/kpi_value_writer/service/KpiValueComposer.py +++ b/src/kpi_value_writer/service/KpiValueComposer.py @@ -1,4 +1,4 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/src/kpi_value_writer/service/KpiValueWriter.py b/src/kpi_value_writer/service/KpiValueWriter.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..2142d76dfc177378e826d5ef20370b6b35693423 100644 --- a/src/kpi_value_writer/service/KpiValueWriter.py +++ b/src/kpi_value_writer/service/KpiValueWriter.py @@ -0,0 +1,127 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc +import json +import logging +import threading +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic + +from common.proto.kpi_value_api_pb2 import KpiValue +from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId +from common.proto.kpi_manager_pb2_grpc import KpiManagerServiceStub + +from confluent_kafka import KafkaError +from confluent_kafka import Consumer as KafkaConsumer + +from kpi_manager.client.KpiManagerClient import KpiManagerClient +from monitoring.service.NameMapping import NameMapping +from kpi_manager.service.KpiManagerService import KpiManagerService + + +LOGGER = logging.getLogger(__name__) +ACTIVE_CONSUMERS = [] + +class KpiValueWriter: + + @staticmethod + def RunKafkaConsumer(): + thread = threading.Thread(target=KpiValueWriter.KafkaConsumer, args=()) + ACTIVE_CONSUMERS.append(thread) + thread.start() + + @staticmethod + def KafkaConsumer(): + kafka_consumer = KafkaConsumer( + { 'bootstrap.servers' : KafkaConfig.SERVER_IP.value, + 'group.id' : __class__, + 'auto.offset.reset' : 'latest'} + ) + kafka_consumer.subscribe([KafkaTopic.VALUE.value]) + LOGGER.debug("Kafka Consumer start listenng on topic: ".format(KafkaTopic.VALUE.value)) + print("Kafka Consumer start listenng on topic: ".format(KafkaTopic.VALUE.value)) + while True: + raw_kpi = kafka_consumer.poll(1.0) + if raw_kpi is None: + continue + elif raw_kpi.error(): + if raw_kpi.error().code() == KafkaError._PARTITION_EOF: + continue + else: + print("Consumer error: {}".format(raw_kpi.error())) + continue + try: + kpi_value = KpiValue() + kpi_value.ParseFromString(raw_kpi.value()) + LOGGER.debug("Received KPI : {:}".format(kpi_value)) + print("Received KPI : {:}".format(kpi_value)) + KpiValueWriter.get_kpi_descriptor_from_db(kpi_value.kpi_id.kpi_id.uuid) + except Exception as e: + print("Error detail: {:}".format(e)) + continue + + @staticmethod + def get_kpi_descriptor_from_db(kpi_value_uuid: str): + print("--- START -----") + + kpi_id = KpiId() + kpi_id.kpi_id.uuid = kpi_value_uuid + print("KpiId generated: {:}".format(kpi_id)) + + kpi_manager_client = KpiManagerClient() + print("Kpi manger client created: {:}".format(kpi_manager_client)) + + try: + kpi_descriptor_object = KpiDescriptor() + kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) + + print("kpi descriptor received: {:}".format(kpi_descriptor_object)) + if isinstance (kpi_descriptor_object, KpiDescriptor): + LOGGER.debug("Extracted row: {:}".format(kpi_descriptor_object)) + else: + LOGGER.debug("Error in extracting row {:}".format(kpi_descriptor_object)) + except Exception as e: + print ("Unable to get Descriptor. Error: {:}".format(e)) + + + +def kpi_manager_service(): + LOGGER.info('Initializing KpiManagerService...') + name_mapping = NameMapping() + # _service = MonitoringService(name_mapping) + _service = KpiManagerService(name_mapping) + _service.start() + + # yield the server, when test finishes, execution will resume to stop it + LOGGER.info('Yielding KpiManagerService...') + yield _service + + LOGGER.info('Terminating KpiManagerService...') + _service.stop() + + LOGGER.info('Terminated KpiManagerService...') + + +def kpi_manager_client_a(kpi_manager_service : KpiManagerService): # pylint: disable=redefined-outer-name,unused-argument + LOGGER.info('Initializing KpiManagerClient...') + _client = KpiManagerClient() + + # yield the server, when test finishes, execution will resume to stop it + LOGGER.info('Yielding KpiManagerClient...') + yield _client + + LOGGER.info('Closing KpiManagerClient...') + _client.close() + + LOGGER.info('Closed KpiManagerClient...') \ No newline at end of file diff --git a/src/kpi_value_writer/tests/test_kpi_value_writer.py b/src/kpi_value_writer/tests/test_kpi_value_writer.py index 7b3362667b929840c481fc9c3088f8dda356bc30..a87a0b6ea7813dc03b2f070e369293de863d3696 100755 --- a/src/kpi_value_writer/tests/test_kpi_value_writer.py +++ b/src/kpi_value_writer/tests/test_kpi_value_writer.py @@ -13,7 +13,18 @@ # limitations under the License. import logging -from kpi_manager.client.KpiManagerClient import KpiManagerClient -from kpi_value_writer.tests.test_messages import create_kpi_descriptor_request +from kpi_value_writer.service.KpiValueWriter import KpiValueWriter +from common.tools.kafka.Variables import KafkaTopic + LOGGER = logging.getLogger(__name__) + +# def test_validate_kafka_topics(): +# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") +# response = KafkaTopic.create_all_topics() +# assert isinstance(response, bool) + +def test_KafkaConsumer(): + LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") + KpiValueWriter.RunKafkaConsumer() + \ No newline at end of file