Skip to content
Snippets Groups Projects
Commit e44a93ba authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Verfies all logger statements for E2E test

parent ff9445f3
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!276Resolve "(CTTC) New Monitoring Module Testing and Debugging"
......@@ -48,7 +48,7 @@ class AnalyticsBackendService(GenericGrpcService):
listener for requests on Kafka topic.
"""
LOGGER.info("Request Listener is initiated ...")
print ("Request Listener is initiated ...")
# print ("Request Listener is initiated ...")
consumer = self.kafka_consumer
consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
while True:
......@@ -60,13 +60,13 @@ class AnalyticsBackendService(GenericGrpcService):
continue
else:
LOGGER.error("Consumer error: {:}".format(receive_msg.error()))
print ("Consumer error: {:}".format(receive_msg.error()))
# print ("Consumer error: {:}".format(receive_msg.error()))
break
try:
analyzer = json.loads(receive_msg.value().decode('utf-8'))
analyzer_uuid = receive_msg.key().decode('utf-8')
LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
# print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
self.StopDaskListener(analyzer_uuid)
......@@ -74,7 +74,7 @@ class AnalyticsBackendService(GenericGrpcService):
self.StartDaskListener(analyzer_uuid, analyzer)
except Exception as e:
LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
print ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
# print ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e))
def StartDaskListener(self, analyzer_uuid, analyzer):
kpi_list = analyzer[ 'input_kpis' ]
......@@ -84,8 +84,8 @@ class AnalyticsBackendService(GenericGrpcService):
LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:}".format(
kpi_list, thresholds, window_size, window_slider))
print ("Received parameters: {:} - {:} - {:} - {:}".format(
kpi_list, thresholds, window_size, window_slider))
# print ("Received parameters: {:} - {:} - {:} - {:}".format(
# kpi_list, thresholds, window_size, window_slider))
try:
stop_event = Event()
thread = Thread(
......@@ -98,11 +98,11 @@ class AnalyticsBackendService(GenericGrpcService):
)
thread.start()
self.running_threads[analyzer_uuid] = (thread, stop_event)
print ("Initiated Analyzer backend: {:}".format(analyzer_uuid))
# print ("Initiated Analyzer backend: {:}".format(analyzer_uuid))
LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid))
return True
except Exception as e:
print ("Failed to initiate Analyzer backend: {:}".format(e))
# print ("Failed to initiate Analyzer backend: {:}".format(e))
LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
return False
......@@ -113,12 +113,12 @@ class AnalyticsBackendService(GenericGrpcService):
stop_event.set()
thread.join()
del self.running_threads[analyzer_uuid]
print ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
# print ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
return True
except Exception as e:
LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: {:}".format(analyzer_uuid, e))
return False
else:
print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid))
# print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid))
LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))
......@@ -31,10 +31,10 @@ LOGGER = logging.getLogger(__name__)
###########################
# --- "test_validate_kafka_topics" should be run before the functionality tests ---
# def test_validate_kafka_topics():
# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
# response = KafkaTopic.create_all_topics()
# assert isinstance(response, bool)
def test_validate_kafka_topics():
LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
# --- To test Dask Streamer functionality ---
......
......@@ -16,6 +16,7 @@ import logging, grpc, json, queue
from typing import Dict
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import KafkaError
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.proto.context_pb2 import Empty
......@@ -96,10 +97,8 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...")
try:
while True:
LOGGER.info("entering while...")
key, value = self.result_queue.get() # Wait until a result is available
LOGGER.info("In while true ...")
yield key, value # Yield the result to the calling function
key, value = self.result_queue.get()
yield key, value
except KeyboardInterrupt:
LOGGER.warning("Listener stopped manually.")
finally:
......@@ -129,8 +128,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
self.result_queue.put((key, value))
else:
LOGGER.info(f"Skipping message with unmatched key: {key}")
# value = json.loads(msg.value().decode('utf-8')) # Added for debugging
# self.result_queue.put((filter_key, value)) # Added for debugging
except Exception as e:
LOGGER.error(f"Error processing Kafka message: {e}")
......@@ -191,7 +188,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
def delivery_callback(self, err, msg):
if err:
LOGGER.debug('Message delivery failed: {:}'.format(err))
print ('Message delivery failed: {:}'.format(err))
# print ('Message delivery failed: {:}'.format(err))
else:
LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
print('Message delivered to topic {:}'.format(msg.topic()))
# print('Message delivered to topic {:}'.format(msg.topic()))
......@@ -97,7 +97,7 @@ def test_validate_kafka_topics():
# assert isinstance(response, AnalyzerId)
# To test start and stop listener together
def test_StartStopAnalyzers(analyticsFrontend_client):
def test_StartAnalyzers(analyticsFrontend_client):
LOGGER.info(' >>> test_StartAnalyzers START: <<< ')
added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer())
LOGGER.debug(str(added_analyzer_id))
......
......@@ -55,7 +55,7 @@ class TelemetryBackendService(GenericGrpcService):
listener for requests on Kafka topic.
"""
LOGGER.info('Telemetry backend request listener is running ...')
print ('Telemetry backend request listener is running ...')
# print ('Telemetry backend request listener is running ...')
consumer = self.kafka_consumer
consumer.subscribe([KafkaTopic.REQUEST.value])
while True:
......@@ -66,13 +66,13 @@ class TelemetryBackendService(GenericGrpcService):
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(receive_msg.error()))
# print("Consumer error: {}".format(receive_msg.error()))
break
try:
collector = json.loads(receive_msg.value().decode('utf-8'))
collector_id = receive_msg.key().decode('utf-8')
LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector))
print('Recevied Collector: {:} - {:}'.format(collector_id, collector))
# print('Recevied Collector: {:} - {:}'.format(collector_id, collector))
if collector['duration'] == -1 and collector['interval'] == -1:
self.TerminateCollectorBackend(collector_id)
......@@ -80,18 +80,19 @@ class TelemetryBackendService(GenericGrpcService):
self.RunInitiateCollectorBackend(collector_id, collector)
except Exception as e:
LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e))
print ("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e))
# print ("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e))
def TerminateCollectorBackend(self, collector_id):
if collector_id in self.running_threads:
thread, stop_event = self.running_threads[collector_id]
stop_event.set()
thread.join()
print ("Terminating backend (by StopCollector): Collector Id: ", collector_id)
# print ("Terminating backend (by StopCollector): Collector Id: ", collector_id)
del self.running_threads[collector_id]
self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend.
else:
print ('Backend collector {:} not found'.format(collector_id))
# print ('Backend collector {:} not found'.format(collector_id))
LOGGER.warning('Backend collector {:} not found'.format(collector_id))
def RunInitiateCollectorBackend(self, collector_id: str, collector: str):
stop_event = threading.Event()
......@@ -104,7 +105,8 @@ class TelemetryBackendService(GenericGrpcService):
"""
Method receives collector request and initiates collecter backend.
"""
print("Initiating backend for collector: ", collector_id)
# print("Initiating backend for collector: ", collector_id)
LOGGER.info("Initiating backend for collector: ", collector_id)
start_time = time.time()
while not stop_event.is_set():
if int(collector['duration']) != -1 and time.time() - start_time >= collector['duration']: # condition to terminate backend
......@@ -136,8 +138,7 @@ class TelemetryBackendService(GenericGrpcService):
Method to extract kpi value.
"""
measured_kpi_value = random.randint(1,100) # TODO: To be extracted from a device
print ("Measured Kpi value: {:}".format(measured_kpi_value))
# measured_kpi_value = self.fetch_node_exporter_metrics() # exporter extracted metric value against default KPI
# print ("Measured Kpi value: {:}".format(measured_kpi_value))
self.GenerateCollectorResponse(collector_id, kpi_id , measured_kpi_value)
def GenerateCollectorResponse(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
......@@ -166,7 +167,7 @@ class TelemetryBackendService(GenericGrpcService):
"""
if err:
LOGGER.debug('Message delivery failed: {:}'.format(err))
print(f'Message delivery failed: {err}')
# print(f'Message delivery failed: {err}')
else:
LOGGER.info('Message delivered to topic {:}'.format(msg.topic()))
print(f'Message delivered to topic {msg.topic()}')
# print(f'Message delivered to topic {msg.topic()}')
......@@ -31,7 +31,7 @@ def test_validate_kafka_topics():
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
def test_RunRequestListener():
LOGGER.info('test_RunRequestListener')
TelemetryBackendServiceObj = TelemetryBackendService()
threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start()
\ No newline at end of file
# def test_RunRequestListener():
# LOGGER.info('test_RunRequestListener')
# TelemetryBackendServiceObj = TelemetryBackendService()
# threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start()
\ No newline at end of file
......@@ -153,7 +153,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
"""
if err:
LOGGER.debug('Message delivery failed: {:}'.format(err))
print('Message delivery failed: {:}'.format(err))
# print('Message delivery failed: {:}'.format(err))
# else:
# LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
# print('Message delivered to topic {:}'.format(msg.topic()))
......@@ -177,7 +177,8 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(receive_msg.error()))
# print("Consumer error: {:}".format(receive_msg.error()))
LOGGER.error("Consumer error: {:}".format(receive_msg.error()))
break
try:
collector_id = receive_msg.key().decode('utf-8')
......@@ -185,13 +186,17 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
kpi_value = json.loads(receive_msg.value().decode('utf-8'))
self.process_response(collector_id, kpi_value['kpi_id'], kpi_value['kpi_value'])
else:
print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ")
# print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ")
LOGGER.info("collector id does not match.\nRespone ID: {:} --- Active IDs: {:}".format(collector_id, ACTIVE_COLLECTORS))
except Exception as e:
print(f"Error extarcting msg key or value: {str(e)}")
# print(f"Error extarcting msg key or value: {str(e)}")
LOGGER.info("Error extarcting msg key or value: {:}".format(e))
continue
def process_response(self, collector_id: str, kpi_id: str, kpi_value: Any):
if kpi_id == "-1" and kpi_value == -1:
print ("Backend termination confirmation for collector id: ", collector_id)
# print ("Backend termination confirmation for collector id: ", collector_id)
LOGGER.info("Backend termination confirmation for collector id: ", collector_id)
else:
print ("KPI Value: Collector Id:", collector_id, ", Kpi Id:", kpi_id, ", Value:", kpi_value)
LOGGER.info("Backend termination confirmation for collector id: ", collector_id)
# print ("KPI Value: Collector Id:", collector_id, ", Kpi Id:", kpi_id, ", Value:", kpi_value)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment