diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index ba58e31eff4b62ba3a9b0d209c935381c6badf36..a6f996932011f6838c8a1e96a325a6a9d885fbd9 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -36,9 +36,6 @@ class KafkaTopic(Enum): """ Method to create Kafka topics defined as class members """ - # LOGGER.debug("Topics to be created: {:}".format(KafkaTopic.__members__.values())) - # LOGGER.debug("Topics to be created: {:}".format(KafkaTopic.__members__.keys())) - # LOGGER.debug("Topics to be created: {:}".format([member.value for member in KafkaTopic])) all_topics = [member.value for member in KafkaTopic] if( KafkaTopic.create_new_topic_if_not_exists( all_topics )): LOGGER.debug("All topics created sucsessfully") @@ -54,16 +51,20 @@ class KafkaTopic(Enum): Args: list of topic: containing the topic name(s) to be created on Kafka """ - LOGGER.debug("Recevied topic List: {:}".format(new_topics)) + LOGGER.debug("Topics names to be verified and created: {:}".format(new_topics)) for topic in new_topics: try: topic_metadata = KafkaConfig.ADMIN_CLIENT.value.list_topics(timeout=5) + # LOGGER.debug("Existing topic list: {:}".format(topic_metadata.topics)) if topic not in topic_metadata.topics: # If the topic does not exist, create a new topic - print(f"Topic '{topic}' does not exist. Creating...") + print("Topic {:} does not exist. Creating...".format(topic)) LOGGER.debug("Topic {:} does not exist. Creating...".format(topic)) new_topic = NewTopic(topic, num_partitions=1, replication_factor=1) KafkaConfig.ADMIN_CLIENT.value.create_topics([new_topic]) + else: + print("Topic name already exists: {:}".format(topic)) + LOGGER.debug("Topic name already exists: {:}".format(topic)) except Exception as e: LOGGER.debug("Failed to create topic: {:}".format(e)) return False diff --git a/src/kpi_manager/service/__main__.py b/src/kpi_manager/service/__main__.py index 50b5048679e6b45a57187f2e4fbfca154a63280a..132a152194adb7780f35550483e14a2414df1cf8 100644 --- a/src/kpi_manager/service/__main__.py +++ b/src/kpi_manager/service/__main__.py @@ -29,35 +29,6 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name LOGGER.warning('Terminate signal received') terminate.set() -# def start_kpi_manager(name_mapping : NameMapping): -# LOGGER.info('Start Kpi Manager...',) - -# events_collector = EventsDeviceCollector(name_mapping) -# events_collector.start() - -# # TODO: redesign this method to be more clear and clean - -# # Iterate while terminate is not set -# while not terminate.is_set(): -# list_new_kpi_ids = events_collector.listen_events() - -# # Monitor Kpis -# if bool(list_new_kpi_ids): -# for kpi_id in list_new_kpi_ids: -# # Create Monitor Kpi Requests -# monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() -# monitor_kpi_request.kpi_id.CopyFrom(kpi_id) -# monitor_kpi_request.monitoring_window_s = 86400 -# monitor_kpi_request.sampling_rate_s = 10 -# events_collector._monitoring_client.MonitorKpi(monitor_kpi_request) - -# time.sleep(0.5) # let other tasks run; do not overload CPU -# else: -# # Terminate is set, looping terminates -# LOGGER.warning("Stopping execution...") - -# events_collector.start() - def main(): global LOGGER # pylint: disable=global-statement @@ -77,21 +48,11 @@ def main(): LOGGER.debug('Starting...') - # Start metrics server - metrics_port = get_metrics_port() - start_http_server(metrics_port) - name_mapping = NameMapping() - # Starting monitoring service - # grpc_service = MonitoringService(name_mapping) - # grpc_service.start() - # start_monitoring(name_mapping) grpc_service = KpiManagerService(name_mapping) grpc_service.start() - # start_kpi_manager(name_mapping) - # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=1.0): pass diff --git a/src/kpi_value_api/requirements.in b/src/kpi_value_api/requirements.in new file mode 100644 index 0000000000000000000000000000000000000000..a642b5e5884a4303506413cdb4e69c5d184f1385 --- /dev/null +++ b/src/kpi_value_api/requirements.in @@ -0,0 +1,2 @@ +confluent-kafka==2.3.0 +requests==2.27.1 \ No newline at end of file diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index 1d3b9bdfd62f5cef34be152c723cfe511d30934c..3ecf20c08a6e280ce346273e5c154e0cdd565c1d 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -105,19 +105,16 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): return KpiValueType(int64Val=int64_value) except ValueError: pass - # Check if the value is a float try: float_value = float(value) return KpiValueType(floatVal=float_value) except ValueError: pass - # Check if the value is a boolean if value.lower() in ['true', 'false']: bool_value = value.lower() == 'true' return KpiValueType(boolVal=bool_value) - # If none of the above, treat it as a string return KpiValueType(stringVal=value) diff --git a/src/kpi_value_api/service/__main__.py b/src/kpi_value_api/service/__main__.py new file mode 100644 index 0000000000000000000000000000000000000000..1a8707112601f317ab111ff19329d00620ad3284 --- /dev/null +++ b/src/kpi_value_api/service/__main__.py @@ -0,0 +1,55 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, signal, sys, threading, time +from prometheus_client import start_http_server +from common.Settings import get_log_level +from .NameMapping import NameMapping # import updated +from .KpiValueApiService import KpiValueApiService + +terminate = threading.Event() +LOGGER = None + +def signal_handler(signal, frame): # pylint: disable=redefined-outer-name + LOGGER.warning('Terminate signal received') + terminate.set() + +def main(): + global LOGGER # pylint: disable=global-statement + + log_level = get_log_level() + logging.basicConfig(level=log_level) + LOGGER = logging.getLogger(__name__) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + LOGGER.debug('Starting...') + + name_mapping = NameMapping() + + grpc_service = KpiValueApiService(name_mapping) + grpc_service.start() + + # Wait for Ctrl+C or termination signal + while not terminate.wait(timeout=1.0): pass + + LOGGER.debug('Terminating...') + grpc_service.stop() + + LOGGER.debug('Bye') + return 0 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/src/kpi_value_api/tests/test_kpi_value_api.py b/src/kpi_value_api/tests/test_kpi_value_api.py index 519a61704c6e5d4ce27d9dc3be45e8ffe5b288cb..6c2858659d75fbf6f9f7df1e691db91797d50701 100644 --- a/src/kpi_value_api/tests/test_kpi_value_api.py +++ b/src/kpi_value_api/tests/test_kpi_value_api.py @@ -90,3 +90,4 @@ def test_store_kpi_values(kpi_value_api_client): LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") response = kpi_value_api_client.StoreKpiValues(create_kpi_value_list()) assert isinstance(response, Empty) + diff --git a/src/kpi_value_writer/requirements.in b/src/kpi_value_writer/requirements.in new file mode 100644 index 0000000000000000000000000000000000000000..a642b5e5884a4303506413cdb4e69c5d184f1385 --- /dev/null +++ b/src/kpi_value_writer/requirements.in @@ -0,0 +1,2 @@ +confluent-kafka==2.3.0 +requests==2.27.1 \ No newline at end of file diff --git a/src/kpi_value_writer/service/__main__.py b/src/kpi_value_writer/service/__main__.py index 9085bc4683ba159bb64043e7b82173442f0a5bdd..3e1d8989f4492c47f1e78e57a107ccb033d3f4c0 100644 --- a/src/kpi_value_writer/service/__main__.py +++ b/src/kpi_value_writer/service/__main__.py @@ -12,16 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, signal, sys, threading, time +import logging, signal, sys, threading from prometheus_client import start_http_server -from common.Constants import ServiceNameEnum -from common.Settings import ( - ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, - wait_for_environment_variables) -from common.proto import monitoring_pb2 -from monitoring.service.EventTools import EventsDeviceCollector # import updated -from monitoring.service.NameMapping import NameMapping # import updated -from .KpiManagerService import KpiManagerService +from .NameMapping import NameMapping # import updated +from .KpiValueWriter import KpiValueWriter +from common.Settings import get_log_level, get_metrics_port terminate = threading.Event() LOGGER = None @@ -30,35 +25,6 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name LOGGER.warning('Terminate signal received') terminate.set() -def start_kpi_manager(name_mapping : NameMapping): - LOGGER.info('Start Monitoring...',) - - events_collector = EventsDeviceCollector(name_mapping) - events_collector.start() - - # TODO: redesign this method to be more clear and clean - - # Iterate while terminate is not set - while not terminate.is_set(): - list_new_kpi_ids = events_collector.listen_events() - - # Monitor Kpis - if bool(list_new_kpi_ids): - for kpi_id in list_new_kpi_ids: - # Create Monitor Kpi Requests - monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() - monitor_kpi_request.kpi_id.CopyFrom(kpi_id) - monitor_kpi_request.monitoring_window_s = 86400 - monitor_kpi_request.sampling_rate_s = 10 - events_collector._monitoring_client.MonitorKpi(monitor_kpi_request) - - time.sleep(0.5) # let other tasks run; do not overload CPU - else: - # Terminate is set, looping terminates - LOGGER.warning("Stopping execution...") - - events_collector.start() - def main(): global LOGGER # pylint: disable=global-statement @@ -66,40 +32,25 @@ def main(): logging.basicConfig(level=log_level) LOGGER = logging.getLogger(__name__) - wait_for_environment_variables([ - get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ), - get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), - get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST ), - get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC), - ]) - signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) - LOGGER.info('Starting...') + LOGGER.debug('Starting...') - # Start metrics server - metrics_port = get_metrics_port() - start_http_server(metrics_port) + start_http_server(get_metrics_port) # add Prometheus client port name_mapping = NameMapping() - # Starting monitoring service - # grpc_service = MonitoringService(name_mapping) - # grpc_service.start() - # start_monitoring(name_mapping) - grpc_service = KpiManagerService(name_mapping) + grpc_service = KpiValueWriter(name_mapping) grpc_service.start() - start_kpi_manager(name_mapping) - # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=1.0): pass - LOGGER.info('Terminating...') + LOGGER.debug('Terminating...') grpc_service.stop() - LOGGER.info('Bye') + LOGGER.debug('Bye') return 0 if __name__ == '__main__': diff --git a/src/kpi_value_writer/tests/test_kpi_value_writer.py b/src/kpi_value_writer/tests/test_kpi_value_writer.py index 0a57c7416e13e6f2e491f47a68aba14a264b84e9..572495d48d70cdc40c0ef6bb1efcf877e2a610ee 100755 --- a/src/kpi_value_writer/tests/test_kpi_value_writer.py +++ b/src/kpi_value_writer/tests/test_kpi_value_writer.py @@ -22,31 +22,31 @@ from kpi_value_writer.tests.test_messages import create_kpi_id_request LOGGER = logging.getLogger(__name__) -def test_GetKpiDescriptor(): - LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ") - kpi_manager_client = KpiManagerClient() - # adding KPI - LOGGER.info(" --->>> calling SetKpiDescriptor ") - response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) - # get KPI - LOGGER.info(" --->>> calling GetKpiDescriptor with response ID") - response = kpi_manager_client.GetKpiDescriptor(response_id) - LOGGER.info("Response gRPC message object: {:}".format(response)) +# def test_GetKpiDescriptor(): +# LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ") +# kpi_manager_client = KpiManagerClient() +# # adding KPI +# LOGGER.info(" --->>> calling SetKpiDescriptor ") +# response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) +# # get KPI +# LOGGER.info(" --->>> calling GetKpiDescriptor with response ID") +# response = kpi_manager_client.GetKpiDescriptor(response_id) +# LOGGER.info("Response gRPC message object: {:}".format(response)) - LOGGER.info(" --->>> calling GetKpiDescriptor with random ID") - rand_response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request()) - LOGGER.info("Response gRPC message object: {:}".format(rand_response)) +# LOGGER.info(" --->>> calling GetKpiDescriptor with random ID") +# rand_response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request()) +# LOGGER.info("Response gRPC message object: {:}".format(rand_response)) - LOGGER.info("\n------------------ TEST FINISHED ---------------------\n") - assert isinstance(response, KpiDescriptor) +# LOGGER.info("\n------------------ TEST FINISHED ---------------------\n") +# assert isinstance(response, KpiDescriptor) # -------- Initial Test ---------------- -# def test_validate_kafka_topics(): -# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") -# response = KafkaTopic.create_all_topics() -# assert isinstance(response, bool) +def test_validate_kafka_topics(): + LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") + response = KafkaTopic.create_all_topics() + assert isinstance(response, bool) -# def test_KafkaConsumer(): -# LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ") -# KpiValueWriter.RunKafkaConsumer() +def test_KafkaConsumer(): + LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ") + KpiValueWriter.RunKafkaConsumer()