Skip to content
Snippets Groups Projects
Commit 6bb6d617 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

file nae change to "testTelemetryBackend"

parent 45f62a56
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!207Resolve "(CTTC) Separation of Monitoring"
......@@ -16,6 +16,7 @@
# print (sys.path)
import logging
from typing import Tuple
from confluent_kafka import Producer as KafkaProducer
from common.proto.context_pb2 import Empty
from src.telemetry.frontend.tests.Messages import create_collector_request, create_collector_id
from src.telemetry.backend.service.TelemetryBackendService import TelemetryBackendService
......@@ -29,22 +30,30 @@ LOGGER = logging.getLogger(__name__)
###########################
def test_get_kafka_configs():
LOGGER.warning('test_get_kafka_configs requesting')
KafkaProducerServiceObj = TelemetryBackendService()
response = KafkaProducerServiceObj.generate_kafka_configs()
TelemetryBackendServiceObj = TelemetryBackendService()
response = TelemetryBackendServiceObj.generate_kafka_configs()
LOGGER.debug(str(response))
assert isinstance(response, dict)
def test_export_collector_value():
LOGGER.warning('test_export_collector_value requesting')
response = TelemetryBackendServiceImpl.export_collector_value(
create_collector_request('1')
)
TelemetryBackendServiceObj = TelemetryBackendServiceImpl()
response = TelemetryBackendServiceObj.export_collector_value(create_collector_request('1'))
LOGGER.debug(str(response))
assert isinstance(response, Tuple)
def test_write_to_kafka():
LOGGER.warning('test_write_to_kafka requesting')
collector_value = TelemetryBackendServiceImpl.export_collector_value(create_collector_request('1'))
response = TelemetryBackendServiceImpl.write_to_kafka(collector_value) # type: ignore (don't know why warning here)
TelemetryBackendServiceObj = TelemetryBackendServiceImpl()
_collector_value = TelemetryBackendServiceObj.export_collector_value(create_collector_request('1'))
response = TelemetryBackendServiceObj.write_to_kafka(_collector_value)
LOGGER.debug(str(response))
assert isinstance(response, Empty)
assert isinstance(response, KafkaProducer)
def test_stop_producer():
LOGGER.warning('test_write_to_kafka requesting')
_kafka_configs = {'bootstrap.servers': '127.0.0.1:9092'}
TelemetryBackendServiceObj = TelemetryBackendServiceImpl()
response = TelemetryBackendServiceObj.stop_producer(KafkaProducer(_kafka_configs))
LOGGER.debug(str(response))
assert isinstance(response, Empty)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment