diff --git a/scripts/run_tests_locally-telemetry-backend.sh b/scripts/run_tests_locally-telemetry-backend.sh index 34e9e0542bf1d46fc03cb2442abdd0a7b6c5961d..8f72fb28324f47b17b7a9549746b461864697238 100755 --- a/scripts/run_tests_locally-telemetry-backend.sh +++ b/scripts/run_tests_locally-telemetry-backend.sh @@ -24,5 +24,5 @@ cd $PROJECTDIR/src # python3 kpi_manager/tests/test_unitary.py RCFILE=$PROJECTDIR/coverage/.coveragerc -python3 -m pytest --log-level=INFO --verbose \ +python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \ telemetry/backend/tests/testTelemetryBackend.py \ No newline at end of file diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index b8888bf8b26a3868ffd5dba443d5280437ab4c6d..f2e5ff3ace508360cdb7ceb344f8e2f275ec31a8 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -33,7 +33,6 @@ LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Telemetry', 'TelemetryBackend') KAFKA_SERVER_IP = '127.0.0.1:9092' ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) -ACTIVE_COLLECTORS = [] KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response'} EXPORTER_ENDPOINT = "http://node-exporter-7465c69b87-b6ks5.telebackend:9100/metrics" @@ -45,6 +44,7 @@ class TelemetryBackendService: def __init__(self): LOGGER.info('Init TelemetryBackendService') + self.running_threads = {} def run_kafka_listener(self)->bool: threading.Thread(target=self.kafka_listener).start() @@ -68,7 +68,7 @@ class TelemetryBackendService: receive_msg = consumerObj.poll(2.0) if receive_msg is None: # print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", topic_request) # added for debugging purposes - print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", KAFKA_TOPICS['request']) # added for debugging purposes + # print (time.time(), " - Telemetry backend is listening on Kafka Topic: ", KAFKA_TOPICS['request']) # added for debugging purposes continue elif receive_msg.error(): if receive_msg.error().code() == KafkaError._PARTITION_EOF: @@ -78,23 +78,31 @@ class TelemetryBackendService: break (kpi_id, duration, interval) = ast.literal_eval(receive_msg.value().decode('utf-8')) collector_id = receive_msg.key().decode('utf-8') - self.run_initiate_collector_backend(collector_id, kpi_id, duration, interval) + if duration == -1 and interval == -1: + self.terminate_collector_backend(collector_id) + # threading.Thread(target=self.terminate_collector_backend, args=(collector_id)) + else: + self.run_initiate_collector_backend(collector_id, kpi_id, duration, interval) def run_initiate_collector_backend(self, collector_id: str, kpi_id: str, duration: int, interval: int): - threading.Thread(target=self.initiate_collector_backend, args=(collector_id, kpi_id, duration, interval)).start() + stop_event = threading.Event() + thread = threading.Thread(target=self.initiate_collector_backend, + args=(collector_id, kpi_id, duration, interval, stop_event)) + self.running_threads[collector_id] = (thread, stop_event) + thread.start() - def initiate_collector_backend(self, collector_id, kpi_id, duration, interval + def initiate_collector_backend(self, collector_id, kpi_id, duration, interval, stop_event ): # type: ignore """ Method to receive collector request attribues and initiates collecter backend. """ + print("Initiating backend for collector: ", collector_id) start_time = time.time() - while True: - ACTIVE_COLLECTORS.append(collector_id) + while not stop_event.is_set(): if time.time() - start_time >= duration: # condition to terminate backend - print("Execution Time Completed: \n --- Consumer terminating: KPI ID: ", kpi_id, " - ", time.time() - start_time) - self.generate_kafka_response(collector_id, "NULL", False) + print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time) + self.generate_kafka_response(collector_id, "-1", -1) # write to Kafka to send the termination confirmation. break # print ("Received KPI: ", kpi_id, ", Duration: ", duration, ", Fetch Interval: ", interval) @@ -125,6 +133,15 @@ class TelemetryBackendService: producerObj.produce(KAFKA_TOPICS['response'], key=msg_key, value= str(msg_value), callback=self.delivery_callback) producerObj.flush() + def terminate_collector_backend(self, collector_id): + if collector_id in self.running_threads: + thread, stop_event = self.running_threads[collector_id] + stop_event.set() + thread.join() + print ("Terminating backend (by StopCollector): Collector Id: ", collector_id) + del self.running_threads[collector_id] + self.generate_kafka_response(collector_id, "-1", -1) + def create_topic_if_not_exists(self, new_topics: list) -> bool: """ Method to create Kafka topic if it does not exist. diff --git a/src/telemetry/backend/tests/testTelemetryBackend.py b/src/telemetry/backend/tests/testTelemetryBackend.py index e3e8bbc4b7cc65b699892cf9974b4b3875bf2cf0..b8b29d04a424e37d35b714a946ae5c66a658eda2 100644 --- a/src/telemetry/backend/tests/testTelemetryBackend.py +++ b/src/telemetry/backend/tests/testTelemetryBackend.py @@ -20,8 +20,6 @@ from typing import Tuple from common.proto.context_pb2 import Empty from src.telemetry.backend.service.TelemetryBackendService import TelemetryBackendService - - LOGGER = logging.getLogger(__name__) @@ -30,7 +28,7 @@ LOGGER = logging.getLogger(__name__) ########################### def test_verify_kafka_topics(): - LOGGER.warning('test_receive_kafka_request requesting') + LOGGER.info('test_verify_kafka_topics requesting') TelemetryBackendServiceObj = TelemetryBackendService() KafkaTopics = ['topic_request', 'topic_response'] response = TelemetryBackendServiceObj.create_topic_if_not_exists(KafkaTopics) @@ -38,7 +36,7 @@ def test_verify_kafka_topics(): assert isinstance(response, bool) def test_run_kafka_listener(): - LOGGER.warning('test_receive_kafka_request requesting') + LOGGER.info('test_receive_kafka_request requesting') TelemetryBackendServiceObj = TelemetryBackendService() response = TelemetryBackendServiceObj.run_kafka_listener() LOGGER.debug(str(response)) diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index 245f92f81f7ca41a6172d6c109063fc0e48f0ada..2fab04b31c0826bfa4f379b3de969850d2a9d3d9 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -44,9 +44,12 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): def __init__(self, name_mapping : NameMapping): LOGGER.info('Init TelemetryFrontendService') self.managementDBobj = managementDB() + self.kafka_producer = KafkaProducer({'bootstrap.servers': KAFKA_SERVER_IP,}) + self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KAFKA_SERVER_IP, + 'group.id' : 'frontend', + 'auto.offset.reset' : 'latest'}) - - def add_collector_to_db(self, request: Collector ): + def add_collector_to_db(self, request: Collector ): # type: ignore try: # Create a new Collector instance collector_to_insert = CollectorModel() @@ -66,6 +69,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): request : Collector, grpc_context: grpc.ServicerContext # type: ignore ) -> CollectorId: # type: ignore # push info to frontend db + LOGGER.info ("gRPC message: {:}".format(request)) response = CollectorId() _collector_id = str(request.collector_id.collector_id.uuid) _collector_kpi_id = str(request.kpi_id.kpi_id.uuid) @@ -73,37 +77,36 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): _collector_interval = int(request.interval_s) # pushing Collector to DB self.add_collector_to_db(request) - self.generate_kafka_request(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval) - # self.run_generate_kafka_request(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval) + self.publish_to_kafka_request_topic(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval) + # self.run_publish_to_kafka_request_topic(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval) response.collector_id.uuid = request.collector_id.collector_id.uuid # type: ignore return response - def run_generate_kafka_request(self, msg_key: str, kpi: str, duration : int, interval: int): - threading.Thread(target=self.generate_kafka_request, args=(msg_key, kpi, duration, interval)).start() + def run_publish_to_kafka_request_topic(self, msg_key: str, kpi: str, duration : int, interval: int): + # Add threading.Thread() response to dictonary and call start() in the next statement + threading.Thread(target=self.publish_to_kafka_request_topic, args=(msg_key, kpi, duration, interval)).start() - # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def generate_kafka_request(self, - msg_key: str, kpi: str, duration : int, interval: int - ) -> KafkaProducer: + def publish_to_kafka_request_topic(self, + collector_id: str, kpi: str, duration : int, interval: int + ): """ Method to generate collector request to Kafka topic. """ # time.sleep(5) - producer_configs = { - 'bootstrap.servers': KAFKA_SERVER_IP, - } + # producer_configs = { + # 'bootstrap.servers': KAFKA_SERVER_IP, + # } # topic_request = "topic_request" - msg_value = Tuple [str, int, int] - msg_value = (kpi, duration, interval) - # print ("Request generated: ", "Colletcor Id: ", msg_key, \ + msg_value : Tuple [str, int, int] = (kpi, duration, interval) + # print ("Request generated: ", "Colletcor Id: ", collector_id, \ # ", \nKPI: ", kpi, ", Duration: ", duration, ", Interval: ", interval) - producerObj = KafkaProducer(producer_configs) - producerObj.produce(KAFKA_TOPICS['request'], key=msg_key, value= str(msg_value), callback=self.delivery_callback) - LOGGER.info("Collector Request Generated: {:} -- {:} -- {:} -- {:}".format(msg_key, kpi, duration, interval)) - # producerObj.produce(topic_request, key=msg_key, value= str(msg_value), callback=self.delivery_callback) - ACTIVE_COLLECTORS.append(msg_key) - producerObj.flush() - return producerObj + # producerObj = KafkaProducer(producer_configs) + self.kafka_producer.produce(KAFKA_TOPICS['request'], key=collector_id, value= str(msg_value), callback=self.delivery_callback) + # producerObj.produce(KAFKA_TOPICS['request'], key=collector_id, value= str(msg_value), callback=self.delivery_callback) + LOGGER.info("Collector Request Generated: {:}, {:}, {:}, {:}".format(collector_id, kpi, duration, interval)) + # producerObj.produce(topic_request, key=collector_id, value= str(msg_value), callback=self.delivery_callback) + ACTIVE_COLLECTORS.append(collector_id) + self.kafka_producer.flush() def run_kafka_listener(self): # print ("--- STARTED: run_kafka_listener ---") @@ -114,21 +117,21 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): """ listener for response on Kafka topic. """ - # print ("--- STARTED: kafka_listener ---") - conusmer_configs = { - 'bootstrap.servers' : KAFKA_SERVER_IP, - 'group.id' : 'frontend', - 'auto.offset.reset' : 'latest' - } - # topic_response = "topic_response" - - consumerObj = KafkaConsumer(conusmer_configs) - consumerObj.subscribe([KAFKA_TOPICS['response']]) + # # print ("--- STARTED: kafka_listener ---") + # conusmer_configs = { + # 'bootstrap.servers' : KAFKA_SERVER_IP, + # 'group.id' : 'frontend', + # 'auto.offset.reset' : 'latest' + # } + # # topic_response = "topic_response" + + # consumerObj = KafkaConsumer(conusmer_configs) + self.kafka_consumer.subscribe([KAFKA_TOPICS['response']]) # print (time.time()) while True: - receive_msg = consumerObj.poll(2.0) + receive_msg = self.kafka_consumer.poll(2.0) if receive_msg is None: - print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['response']) # added for debugging purposes + # print (" - Telemetry frontend listening on Kafka Topic: ", KAFKA_TOPICS['response']) # added for debugging purposes continue elif receive_msg.error(): if receive_msg.error().code() == KafkaError._PARTITION_EOF: @@ -140,7 +143,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): collector_id = receive_msg.key().decode('utf-8') if collector_id in ACTIVE_COLLECTORS: (kpi_id, kpi_value) = ast.literal_eval(receive_msg.value().decode('utf-8')) - self.process_response(kpi_id, kpi_value) + self.process_response(collector_id, kpi_id, kpi_value) else: print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ") except Exception as e: @@ -148,8 +151,12 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): continue # return None - def process_response(self, kpi_id: str, kpi_value: Any): - print ("Frontend - KPI: ", kpi_id, ", VALUE: ", kpi_value) + def process_response(self, collector_id: str, kpi_id: str, kpi_value: Any): + if kpi_id == "-1" and kpi_value == -1: + # LOGGER.info("Sucessfully terminated Collector: {:}".format(collector_id)) + print ("Sucessfully terminated Collector: ", collector_id) + else: + print ("Frontend-Received values Collector Id:", collector_id, "-KPI:", kpi_id, "-VALUE:", kpi_value) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def delivery_callback(self, err, msg): @@ -168,12 +175,30 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): def StopCollector(self, request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore ) -> Empty: # type: ignore - request.collector_id.uuid = "" + LOGGER.info ("gRPC message: {:}".format(request)) + _collector_id = request.collector_id.uuid + self.publish_to_kafka_request_topic(_collector_id, "", -1, -1) return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectCollectors(self, request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore ) -> CollectorList: # type: ignore + LOGGER.info("gRPC message: {:}".format(request)) response = CollectorList() - return response \ No newline at end of file + filter_to_apply = dict() + filter_to_apply['kpi_id'] = request.kpi_id[0].kpi_id.uuid + # filter_to_apply['duration_s'] = request.duration_s[0] + try: + rows = self.managementDBobj.select_with_filter(CollectorModel, **filter_to_apply) + except Exception as e: + LOGGER.info('Unable to apply filter on kpi descriptor. {:}'.format(e)) + try: + if len(rows) != 0: + for row in rows: + collector_obj = Collector() + collector_obj.collector_id.collector_id.uuid = row.collector_id + response.collector_list.append(collector_obj) + return response + except Exception as e: + LOGGER.info('Unable to process response {:}'.format(e)) \ No newline at end of file diff --git a/src/telemetry/frontend/tests/Messages.py b/src/telemetry/frontend/tests/Messages.py index 0a33de63ef03ca0db12d6167180172bde8b167a6..48668f7bfbbd016e495d96dea2f3219dc2dc0c62 100644 --- a/src/telemetry/frontend/tests/Messages.py +++ b/src/telemetry/frontend/tests/Messages.py @@ -22,10 +22,10 @@ def create_collector_id(): _collector_id.collector_id.uuid = uuid.uuid4() return _collector_id -def create_collector_id_a(coll_id_str : str): - _collector_id = telemetry_frontend_pb2.CollectorId() - _collector_id.collector_id.uuid = str(coll_id_str) - return _collector_id +# def create_collector_id_a(coll_id_str : str): +# _collector_id = telemetry_frontend_pb2.CollectorId() +# _collector_id.collector_id.uuid = str(coll_id_str) +# return _collector_id def create_collector_request(): _create_collector_request = telemetry_frontend_pb2.Collector() @@ -35,39 +35,45 @@ def create_collector_request(): _create_collector_request.interval_s = float(random.randint(2, 4)) return _create_collector_request -def create_collector_request_a(): - _create_collector_request_a = telemetry_frontend_pb2.Collector() - _create_collector_request_a.collector_id.collector_id.uuid = "-1" - return _create_collector_request_a - -def create_collector_request_b(str_kpi_id, coll_duration_s, coll_interval_s - ) -> telemetry_frontend_pb2.Collector: - _create_collector_request_b = telemetry_frontend_pb2.Collector() - _create_collector_request_b.collector_id.collector_id.uuid = '1' - _create_collector_request_b.kpi_id.kpi_id.uuid = str_kpi_id - _create_collector_request_b.duration_s = coll_duration_s - _create_collector_request_b.interval_s = coll_interval_s - return _create_collector_request_b - def create_collector_filter(): _create_collector_filter = telemetry_frontend_pb2.CollectorFilter() - new_collector_id = _create_collector_filter.collector_id.add() - new_collector_id.collector_id.uuid = "COLL1" new_kpi_id = _create_collector_filter.kpi_id.add() - new_kpi_id.kpi_id.uuid = "KPI1" - new_device_id = _create_collector_filter.device_id.add() - new_device_id.device_uuid.uuid = 'DEV1' - new_service_id = _create_collector_filter.service_id.add() - new_service_id.service_uuid.uuid = 'SERV1' - new_slice_id = _create_collector_filter.slice_id.add() - new_slice_id.slice_uuid.uuid = 'SLC1' - new_endpoint_id = _create_collector_filter.endpoint_id.add() - new_endpoint_id.endpoint_uuid.uuid = 'END1' - new_connection_id = _create_collector_filter.connection_id.add() - new_connection_id.connection_uuid.uuid = 'CON1' - _create_collector_filter.kpi_sample_type.append(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED) + new_kpi_id.kpi_id.uuid = "165d20c5-a446-42fa-812f-e2b7ed283c6f" return _create_collector_filter -def create_collector_list(): - _create_collector_list = telemetry_frontend_pb2.CollectorList() - return _create_collector_list \ No newline at end of file +# def create_collector_request_a(): +# _create_collector_request_a = telemetry_frontend_pb2.Collector() +# _create_collector_request_a.collector_id.collector_id.uuid = "-1" +# return _create_collector_request_a + +# def create_collector_request_b(str_kpi_id, coll_duration_s, coll_interval_s +# ) -> telemetry_frontend_pb2.Collector: +# _create_collector_request_b = telemetry_frontend_pb2.Collector() +# _create_collector_request_b.collector_id.collector_id.uuid = '1' +# _create_collector_request_b.kpi_id.kpi_id.uuid = str_kpi_id +# _create_collector_request_b.duration_s = coll_duration_s +# _create_collector_request_b.interval_s = coll_interval_s +# return _create_collector_request_b + +# def create_collector_filter(): +# _create_collector_filter = telemetry_frontend_pb2.CollectorFilter() +# new_collector_id = _create_collector_filter.collector_id.add() +# new_collector_id.collector_id.uuid = "COLL1" +# new_kpi_id = _create_collector_filter.kpi_id.add() +# new_kpi_id.kpi_id.uuid = "KPI1" +# new_device_id = _create_collector_filter.device_id.add() +# new_device_id.device_uuid.uuid = 'DEV1' +# new_service_id = _create_collector_filter.service_id.add() +# new_service_id.service_uuid.uuid = 'SERV1' +# new_slice_id = _create_collector_filter.slice_id.add() +# new_slice_id.slice_uuid.uuid = 'SLC1' +# new_endpoint_id = _create_collector_filter.endpoint_id.add() +# new_endpoint_id.endpoint_uuid.uuid = 'END1' +# new_connection_id = _create_collector_filter.connection_id.add() +# new_connection_id.connection_uuid.uuid = 'CON1' +# _create_collector_filter.kpi_sample_type.append(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED) +# return _create_collector_filter + +# def create_collector_list(): +# _create_collector_list = telemetry_frontend_pb2.CollectorList() +# return _create_collector_list \ No newline at end of file diff --git a/src/telemetry/frontend/tests/test_frontend.py b/src/telemetry/frontend/tests/test_frontend.py index 230122a2d2e7b4cab76296766a95d78898cbd2bc..7d050349bcfaa37835520d12aa312ead777af8e5 100644 --- a/src/telemetry/frontend/tests/test_frontend.py +++ b/src/telemetry/frontend/tests/test_frontend.py @@ -31,8 +31,7 @@ from common.Settings import ( from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient from telemetry.frontend.service.TelemetryFrontendService import TelemetryFrontendService from telemetry.frontend.service.TelemetryFrontendServiceServicerImpl import TelemetryFrontendServiceServicerImpl -from telemetry.frontend.tests.Messages import ( create_collector_id, create_collector_request, - create_collector_filter, create_collector_request_a, create_collector_request_b) +from telemetry.frontend.tests.Messages import ( create_collector_request, create_collector_filter) from device.client.DeviceClient import DeviceClient from device.service.DeviceService import DeviceService @@ -167,43 +166,31 @@ def telemetryFrontend_client( # Tests Implementation of Telemetry Frontend ########################### -def test_start_collector(telemetryFrontend_client): - LOGGER.info('test_start_collector requesting') +def test_StartCollector(telemetryFrontend_client): + LOGGER.info(' >>> test_StartCollector START: <<< ') response = telemetryFrontend_client.StartCollector(create_collector_request()) LOGGER.debug(str(response)) assert isinstance(response, CollectorId) -# def test_start_collector_a(telemetryFrontend_client): -# LOGGER.warning('test_start_collector requesting') -# response = telemetryFrontend_client.StartCollector(create_collector_request()) -# LOGGER.debug(str(response)) -# assert isinstance(response, CollectorId) - -# def test_start_collector_b(telemetryFrontend_client): -# LOGGER.warning('test_start_collector requesting') -# response = telemetryFrontend_client.StartCollector(create_collector_request()) -# LOGGER.debug(str(response)) -# assert isinstance(response, CollectorId) - -# def test_run_kafka_listener(): -# LOGGER.warning('test_receive_kafka_request requesting') -# name_mapping = NameMapping() -# TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl(name_mapping) -# response = TelemetryFrontendServiceObj.run_kafka_listener() # Method "run_kafka_listener" is not define in frontend.proto -# LOGGER.debug(str(response)) -# assert isinstance(response, bool) - - - +def test_run_kafka_listener(): + LOGGER.info(' >>> test_run_kafka_listener START: <<< ') + name_mapping = NameMapping() + TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl(name_mapping) + response = TelemetryFrontendServiceObj.run_kafka_listener() # Method "run_kafka_listener" is not define in frontend.proto + LOGGER.debug(str(response)) + assert isinstance(response, bool) -# def test_stop_collector(telemetryFrontend_client): -# LOGGER.warning('test_stop_collector requesting') -# response = telemetryFrontend_client.StopCollector(create_collector_id("1")) -# LOGGER.debug(str(response)) -# assert isinstance(response, Empty) +def test_StopCollector(telemetryFrontend_client): + LOGGER.info(' >>> test_StopCollector START: <<< ') + _collector_id = telemetryFrontend_client.StartCollector(create_collector_request()) + time.sleep(3) # wait for small amount before call the stopCollecter() + response = telemetryFrontend_client.StopCollector(_collector_id) + LOGGER.debug(str(response)) + assert isinstance(response, Empty) -# def test_select_collectors(telemetryFrontend_client): -# LOGGER.warning('test_select_collector requesting') -# response = telemetryFrontend_client.SelectCollectors(create_collector_filter()) -# LOGGER.debug(str(response)) -# assert isinstance(response, CollectorList) \ No newline at end of file +def test_select_collectors(telemetryFrontend_client): + LOGGER.info(' >>> test_select_collector requesting <<< ') + response = telemetryFrontend_client.SelectCollectors(create_collector_filter()) + LOGGER.info('Received Rows after applying Filter: {:} '.format(response)) + LOGGER.debug(str(response)) + assert isinstance(response, CollectorList) \ No newline at end of file