diff --git a/src/telemetry/backend/service/KafkaProducerService.py b/src/telemetry/backend/service/TelemetryBackendService.py similarity index 64% rename from src/telemetry/backend/service/KafkaProducerService.py rename to src/telemetry/backend/service/TelemetryBackendService.py index 0a76b2d998258ca42dada4a0618889db43cb34d3..8e6fb243ea324b9eb572c165a43a8bbaf22466f3 100755 --- a/src/telemetry/backend/service/KafkaProducerService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -12,9 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .KafkaProducerServiceImpl import KafkaProducerServiceImpl - -class KafkaProducerService: +class TelemetryBackendService: """ Class to control Kafka producer functionality. """ @@ -35,19 +33,5 @@ class KafkaProducerService: } return create_kafka_configs - def run_producer(self): - """ - Method to create KafkaProducerServiceImpl object and start producer. - """ - # Create NodeExporterProducer object and run start_producer_thread - producer = KafkaProducerServiceImpl(self.bootstrap_servers, self.node_exporter_endpoint, - self.kafka_topic, self.run_duration, self.fetch_interval - ) - # producer.start_producer_thread() # if threading is required - producer.produce_metrics() # if threading is not required -if __name__ == "__main__": - # Create Kafka producer service object and run producer - kafka_controller = KafkaProducerService() - kafka_controller.run_producer() diff --git a/src/telemetry/backend/service/KafkaProducerServiceImpl.py b/src/telemetry/backend/service/TelemetryBackendServiceImpl.py similarity index 96% rename from src/telemetry/backend/service/KafkaProducerServiceImpl.py rename to src/telemetry/backend/service/TelemetryBackendServiceImpl.py index da55131700f366416304407579afe2d7f7aab00e..ea57f6167e994ff5367a2720e6c9473b77e6a7e9 100755 --- a/src/telemetry/backend/service/KafkaProducerServiceImpl.py +++ b/src/telemetry/backend/service/TelemetryBackendServiceImpl.py @@ -27,7 +27,7 @@ from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_m LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend') -class KafkaProducerServiceImpl: +class TelemetryBackendServiceImpl: """ Class to fetch metrics from Exporter and produce them to Kafka. """ @@ -171,12 +171,5 @@ class KafkaProducerServiceImpl: finally: kafka_producer.flush() # kafka_producer.close() # this command generates ERROR - # --- - def start_producer_thread(self): - """ - Method to start the producer thread. - """ - producer_thread = threading.Thread(target=self.produce_metrics) - producer_thread.start() # ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter ----------- \ No newline at end of file diff --git a/src/telemetry/backend/service/__main__.py b/src/telemetry/backend/service/__main__.py new file mode 100644 index 0000000000000000000000000000000000000000..10c3f76d1d2f32bf610fc638d4a2e990b6c3f53a --- /dev/null +++ b/src/telemetry/backend/service/__main__.py @@ -0,0 +1,72 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import signal +import logging +import threading +from prometheus_client import start_http_server +from monitoring.service.NameMapping import NameMapping +from .KafkaProducerService import KafkaProducerService +from common.Settings import ( + get_log_level, + get_metrics_port) + +terminate = threading.Event() +LOGGER = None + +def signal_handler(signal, frame): # pylint: disable=redefined-outer-name + LOGGER.warning('Terminate signal received') + terminate.set() + +def main(): + global LOGGER + + log_level = get_log_level() + logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s") + LOGGER = logging.getLogger(__name__) + +# ------- will be added later -------------- + # wait_for_environment_variables([ + # get_env_var_name + + + # ]) +# ------- will be added later -------------- + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + LOGGER.info('Starting Telemetry Backend...') + + # Start metrics server + metrics_port = get_metrics_port() + start_http_server(metrics_port) + + name_mapping = NameMapping() + + grpc_service = KafkaProducerService(name_mapping) + grpc_service.start() + + # Wait for Ctrl+C or termination signal + while not terminate.wait(timeout=1.0): pass + + LOGGER.info('Terminating...') + grpc_service.stop() + + LOGGER.info('Bye') + return 0 + +if __name__ == '__main__': + sys.exit(main()) \ No newline at end of file diff --git a/src/telemetry/backend/tests/test_kafka_backend.py b/src/telemetry/backend/tests/test_kafka_backend.py index 05174da2b2886801f2127f817b2f3fc0d4e29797..ac49bc30f206c2c4a4f80940d8cb2e2a274c1acf 100644 --- a/src/telemetry/backend/tests/test_kafka_backend.py +++ b/src/telemetry/backend/tests/test_kafka_backend.py @@ -18,8 +18,8 @@ import logging from typing import Tuple from common.proto.context_pb2 import Empty from src.telemetry.frontend.tests.Messages import create_collector_request, create_collector_id -from src.telemetry.backend.service.KafkaProducerService import KafkaProducerService -from src.telemetry.backend.service.KafkaProducerServiceImpl import KafkaProducerServiceImpl +from src.telemetry.backend.service.TelemetryBackendService import TelemetryBackendService +from src.telemetry.backend.service.TelemetryBackendServiceImpl import TelemetryBackendServiceImpl LOGGER = logging.getLogger(__name__) @@ -29,14 +29,14 @@ LOGGER = logging.getLogger(__name__) ########################### def test_get_kafka_configs(): LOGGER.warning('test_get_kafka_configs requesting') - KafkaProducerServiceObj = KafkaProducerService() + KafkaProducerServiceObj = TelemetryBackendService() response = KafkaProducerServiceObj.generate_kafka_configs() LOGGER.debug(str(response)) assert isinstance(response, dict) def test_export_collector_value(): LOGGER.warning('test_export_collector_value requesting') - response = KafkaProducerServiceImpl.export_collector_value( + response = TelemetryBackendServiceImpl.export_collector_value( create_collector_request('1') ) LOGGER.debug(str(response)) @@ -44,7 +44,7 @@ def test_export_collector_value(): def test_write_to_kafka(): LOGGER.warning('test_write_to_kafka requesting') - collector_value = KafkaProducerServiceImpl.export_collector_value(create_collector_request('1')) - response = KafkaProducerServiceImpl.write_to_kafka(collector_value) # type: ignore (don't know why warning here) + collector_value = TelemetryBackendServiceImpl.export_collector_value(create_collector_request('1')) + response = TelemetryBackendServiceImpl.write_to_kafka(collector_value) # type: ignore (don't know why warning here) LOGGER.debug(str(response)) assert isinstance(response, Empty) diff --git a/src/telemetry/frontend/service/__main__.py b/src/telemetry/frontend/service/__main__.py index afc381e09308a938cdf784aba6e4667ca2c77578..0f48a4de10168ec5d238a2f0bbdd7a97b0e481c5 100644 --- a/src/telemetry/frontend/service/__main__.py +++ b/src/telemetry/frontend/service/__main__.py @@ -15,8 +15,9 @@ import signal import sys import logging, threading -from .TelemetryFrontendService import TelemetryFrontendService +from prometheus_client import start_http_server from monitoring.service.NameMapping import NameMapping +from .TelemetryFrontendService import TelemetryFrontendService from monitoring.service.EventTools import EventsDeviceCollector from common.Settings import ( get_log_level, wait_for_environment_variables, get_env_var_name,