# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import sys print (sys.path) sys.path.append('/home/tfs/tfs-ctrl') import threading import logging from typing import Tuple # from common.proto.context_pb2 import Empty from src.telemetry.backend.service.TelemetryBackendService import TelemetryBackendService LOGGER = logging.getLogger(__name__) ########################### # Tests Implementation of Telemetry Backend ########################### def test_verify_kafka_topics(): LOGGER.info('test_verify_kafka_topics requesting') TelemetryBackendServiceObj = TelemetryBackendService() KafkaTopics = ['topic_request', 'topic_response', 'topic_raw', 'topic_labled'] response = TelemetryBackendServiceObj.create_topic_if_not_exists(KafkaTopics) LOGGER.debug(str(response)) assert isinstance(response, bool) # def test_run_kafka_listener(): # LOGGER.info('test_receive_kafka_request requesting') # TelemetryBackendServiceObj = TelemetryBackendService() # response = TelemetryBackendServiceObj.run_kafka_listener() # LOGGER.debug(str(response)) # assert isinstance(response, bool) # def test_fetch_node_exporter_metrics(): # LOGGER.info(' >>> test_fetch_node_exporter_metrics START <<< ') # TelemetryBackendService.fetch_single_node_exporter_metric() def test_stream_node_export_metrics_to_raw_topic(): LOGGER.info(' >>> test_stream_node_export_metrics_to_raw_topic START <<< ') threading.Thread(target=TelemetryBackendService.stream_node_export_metrics_to_raw_topic, args=()).start()