# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import sys # print (sys.path) import logging from typing import Tuple from confluent_kafka import Producer as KafkaProducer from common.proto.context_pb2 import Empty from src.telemetry.frontend.tests.Messages import create_collector_request, create_collector_id from src.telemetry.backend.service.TelemetryBackendService import TelemetryBackendService from src.telemetry.backend.service.TelemetryBackendServiceImpl import TelemetryBackendServiceImpl LOGGER = logging.getLogger(__name__) ########################### # Tests Implementation of Telemetry Backend ########################### def test_get_kafka_configs(): LOGGER.warning('test_get_kafka_configs requesting') TelemetryBackendServiceObj = TelemetryBackendService() response = TelemetryBackendServiceObj.generate_kafka_configs() LOGGER.debug(str(response)) assert isinstance(response, dict) def test_export_collector_value(): LOGGER.warning('test_export_collector_value requesting') TelemetryBackendServiceObj = TelemetryBackendServiceImpl() response = TelemetryBackendServiceObj.export_collector_value(create_collector_request('1')) LOGGER.debug(str(response)) assert isinstance(response, Tuple) def test_write_to_kafka(): LOGGER.warning('test_write_to_kafka requesting') TelemetryBackendServiceObj = TelemetryBackendServiceImpl() _collector_value = TelemetryBackendServiceObj.export_collector_value(create_collector_request('1')) response = TelemetryBackendServiceObj.write_to_kafka(_collector_value) LOGGER.debug(str(response)) assert isinstance(response, KafkaProducer) def test_stop_producer(): LOGGER.warning('test_write_to_kafka requesting') _kafka_configs = {'bootstrap.servers': '127.0.0.1:9092'} TelemetryBackendServiceObj = TelemetryBackendServiceImpl() response = TelemetryBackendServiceObj.stop_producer(KafkaProducer(_kafka_configs)) LOGGER.debug(str(response)) assert isinstance(response, Empty)