Commit 10a7540c authored by Waleed Akbar's avatar Waleed Akbar
Browse files

"KafkaProducerService" and "KafkaProducerServiceImpl" class name changed to...

"KafkaProducerService" and "KafkaProducerServiceImpl" class name changed to "TelemetryBackendService" and "TelemetryBackendServiceImpl" respectivily
parent f05d4020
Loading
Loading
Loading
Loading
+1 −17
Original line number Diff line number Diff line
@@ -12,9 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from .KafkaProducerServiceImpl import KafkaProducerServiceImpl

class KafkaProducerService:
class TelemetryBackendService:
    """
    Class to control Kafka producer functionality.
    """
@@ -35,19 +33,5 @@ class KafkaProducerService:
        }
        return create_kafka_configs

    def run_producer(self):
        """
        Method to create KafkaProducerServiceImpl object and start producer.
        """
        # Create NodeExporterProducer object and run start_producer_thread
        producer = KafkaProducerServiceImpl(self.bootstrap_servers, self.node_exporter_endpoint, 
                    self.kafka_topic, self.run_duration, self.fetch_interval
                    )
        # producer.start_producer_thread()    # if threading is required
        producer.produce_metrics()      # if threading is not required

if __name__ == "__main__":
    # Create Kafka producer service object and run producer
    kafka_controller = KafkaProducerService()
    kafka_controller.run_producer()
+1 −8
Original line number Diff line number Diff line
@@ -27,7 +27,7 @@ from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_m
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend')

class KafkaProducerServiceImpl:
class TelemetryBackendServiceImpl:
    """
    Class to fetch metrics from Exporter and produce them to Kafka.
    """
@@ -171,12 +171,5 @@ class KafkaProducerServiceImpl:
        finally:
            kafka_producer.flush()
            # kafka_producer.close()        # this command generates ERROR
    # ---
    def start_producer_thread(self):
        """
        Method to start the producer thread.
        """
        producer_thread = threading.Thread(target=self.produce_metrics)
        producer_thread.start()

# ----------- ABOVE: Actual Implementation of Kafka Producer with Node Exporter -----------
 No newline at end of file
+72 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
import signal
import logging
import threading
from prometheus_client import start_http_server
from monitoring.service.NameMapping import NameMapping
from .KafkaProducerService import KafkaProducerService
from common.Settings import (
    get_log_level,
    get_metrics_port)

terminate = threading.Event()
LOGGER = None

def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
    LOGGER.warning('Terminate signal received')
    terminate.set()

def main():
    global LOGGER

    log_level = get_log_level()
    logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
    LOGGER = logging.getLogger(__name__)

# ------- will be added later --------------
    # wait_for_environment_variables([
    #     get_env_var_name


    # ])
# ------- will be added later --------------

    signal.signal(signal.SIGINT,  signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    LOGGER.info('Starting Telemetry Backend...')

    # Start metrics server
    metrics_port = get_metrics_port()
    start_http_server(metrics_port)

    name_mapping = NameMapping()

    grpc_service = KafkaProducerService(name_mapping)
    grpc_service.start()

    # Wait for Ctrl+C or termination signal
    while not terminate.wait(timeout=1.0): pass

    LOGGER.info('Terminating...')
    grpc_service.stop()

    LOGGER.info('Bye')
    return 0

if __name__ == '__main__':
    sys.exit(main())
 No newline at end of file
+6 −6
Original line number Diff line number Diff line
@@ -18,8 +18,8 @@ import logging
from typing import Tuple
from common.proto.context_pb2 import Empty
from src.telemetry.frontend.tests.Messages import create_collector_request, create_collector_id
from src.telemetry.backend.service.KafkaProducerService import KafkaProducerService
from src.telemetry.backend.service.KafkaProducerServiceImpl import KafkaProducerServiceImpl
from src.telemetry.backend.service.TelemetryBackendService import TelemetryBackendService
from src.telemetry.backend.service.TelemetryBackendServiceImpl import TelemetryBackendServiceImpl

LOGGER = logging.getLogger(__name__)

@@ -29,14 +29,14 @@ LOGGER = logging.getLogger(__name__)
###########################
def test_get_kafka_configs():
    LOGGER.warning('test_get_kafka_configs requesting')
    KafkaProducerServiceObj = KafkaProducerService()
    KafkaProducerServiceObj = TelemetryBackendService()
    response = KafkaProducerServiceObj.generate_kafka_configs()
    LOGGER.debug(str(response))
    assert isinstance(response, dict)

def test_export_collector_value():
    LOGGER.warning('test_export_collector_value requesting')
    response = KafkaProducerServiceImpl.export_collector_value(
    response = TelemetryBackendServiceImpl.export_collector_value(
        create_collector_request('1')
    )
    LOGGER.debug(str(response))
@@ -44,7 +44,7 @@ def test_export_collector_value():

def test_write_to_kafka():
    LOGGER.warning('test_write_to_kafka requesting')
    collector_value = KafkaProducerServiceImpl.export_collector_value(create_collector_request('1'))
    response = KafkaProducerServiceImpl.write_to_kafka(collector_value)      # type: ignore (don't know why warning here)
    collector_value = TelemetryBackendServiceImpl.export_collector_value(create_collector_request('1'))
    response = TelemetryBackendServiceImpl.write_to_kafka(collector_value)      # type: ignore (don't know why warning here)
    LOGGER.debug(str(response))
    assert isinstance(response, Empty)
+2 −1
Original line number Diff line number Diff line
@@ -15,8 +15,9 @@
import signal
import sys
import logging, threading
from .TelemetryFrontendService import TelemetryFrontendService
from prometheus_client import start_http_server
from monitoring.service.NameMapping import NameMapping
from .TelemetryFrontendService import TelemetryFrontendService
from monitoring.service.EventTools import EventsDeviceCollector
from common.Settings import (
    get_log_level, wait_for_environment_variables, get_env_var_name,