diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index f9c3b86a2eafe8a82691fa992027b026a1e9aa18..a8c790e786f2768091ee3ab8d8d61860f18c2f77 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -48,7 +48,7 @@ class AnalyticsBackendService(GenericGrpcService): listener for requests on Kafka topic. """ LOGGER.info("Request Listener is initiated ...") - print ("Request Listener is initiated ...") + # print ("Request Listener is initiated ...") consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value]) while True: @@ -60,13 +60,13 @@ class AnalyticsBackendService(GenericGrpcService): continue else: LOGGER.error("Consumer error: {:}".format(receive_msg.error())) - print ("Consumer error: {:}".format(receive_msg.error())) + # print ("Consumer error: {:}".format(receive_msg.error())) break try: analyzer = json.loads(receive_msg.value().decode('utf-8')) analyzer_uuid = receive_msg.key().decode('utf-8') LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) - print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) + # print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: self.StopDaskListener(analyzer_uuid) @@ -74,7 +74,7 @@ class AnalyticsBackendService(GenericGrpcService): self.StartDaskListener(analyzer_uuid, analyzer) except Exception as e: LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) - print ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) + # print ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) def StartDaskListener(self, analyzer_uuid, analyzer): kpi_list = analyzer[ 'input_kpis' ] @@ -84,8 +84,8 @@ class AnalyticsBackendService(GenericGrpcService): LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:}".format( kpi_list, thresholds, window_size, window_slider)) - print ("Received parameters: {:} - {:} - {:} - {:}".format( - kpi_list, thresholds, window_size, window_slider)) + # print ("Received parameters: {:} - {:} - {:} - {:}".format( + # kpi_list, thresholds, window_size, window_slider)) try: stop_event = Event() thread = Thread( @@ -98,11 +98,11 @@ class AnalyticsBackendService(GenericGrpcService): ) thread.start() self.running_threads[analyzer_uuid] = (thread, stop_event) - print ("Initiated Analyzer backend: {:}".format(analyzer_uuid)) + # print ("Initiated Analyzer backend: {:}".format(analyzer_uuid)) LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid)) return True except Exception as e: - print ("Failed to initiate Analyzer backend: {:}".format(e)) + # print ("Failed to initiate Analyzer backend: {:}".format(e)) LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e)) return False @@ -113,12 +113,12 @@ class AnalyticsBackendService(GenericGrpcService): stop_event.set() thread.join() del self.running_threads[analyzer_uuid] - print ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) + # print ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) return True except Exception as e: LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: {:}".format(analyzer_uuid, e)) return False else: - print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid)) + # print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid)) LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index 79d760f8ec06e0f28e9ce98ffd5ee460a5c66c37..01d71df84332ce4bf8b1cdd1d44a3f1897c724d2 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -31,10 +31,10 @@ LOGGER = logging.getLogger(__name__) ########################### # --- "test_validate_kafka_topics" should be run before the functionality tests --- -# def test_validate_kafka_topics(): -# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") -# response = KafkaTopic.create_all_topics() -# assert isinstance(response, bool) +def test_validate_kafka_topics(): + LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") + response = KafkaTopic.create_all_topics() + assert isinstance(response, bool) # --- To test Dask Streamer functionality --- diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index e304d7acb1f1346128dc1f9268034af6b93185da..323113bb0d8234f41961d05a049986296167b96b 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -16,6 +16,7 @@ import logging, grpc, json, queue from typing import Dict from confluent_kafka import Producer as KafkaProducer +from confluent_kafka import KafkaError from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.proto.context_pb2 import Empty @@ -96,10 +97,8 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...") try: while True: - LOGGER.info("entering while...") - key, value = self.result_queue.get() # Wait until a result is available - LOGGER.info("In while true ...") - yield key, value # Yield the result to the calling function + key, value = self.result_queue.get() + yield key, value except KeyboardInterrupt: LOGGER.warning("Listener stopped manually.") finally: @@ -129,8 +128,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): self.result_queue.put((key, value)) else: LOGGER.info(f"Skipping message with unmatched key: {key}") - # value = json.loads(msg.value().decode('utf-8')) # Added for debugging - # self.result_queue.put((filter_key, value)) # Added for debugging except Exception as e: LOGGER.error(f"Error processing Kafka message: {e}") @@ -191,7 +188,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): def delivery_callback(self, err, msg): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) - print ('Message delivery failed: {:}'.format(err)) + # print ('Message delivery failed: {:}'.format(err)) else: LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) - print('Message delivered to topic {:}'.format(msg.topic())) + # print('Message delivered to topic {:}'.format(msg.topic())) diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py index 3898ec65e81b18214dc9cb014222312de3a4fd19..526c32eb8af98afde6b89e784f62f0a2d0f7f432 100644 --- a/src/analytics/frontend/tests/test_frontend.py +++ b/src/analytics/frontend/tests/test_frontend.py @@ -97,7 +97,7 @@ def test_validate_kafka_topics(): # assert isinstance(response, AnalyzerId) # To test start and stop listener together -def test_StartStopAnalyzers(analyticsFrontend_client): +def test_StartAnalyzers(analyticsFrontend_client): LOGGER.info(' >>> test_StartAnalyzers START: <<< ') added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer()) LOGGER.debug(str(added_analyzer_id)) diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index f3cf18d65eac0fda6c56a366ce8d806a4ea2562a..79a35d343860d19992518c0e8b29e427e5cbbef4 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -55,7 +55,7 @@ class TelemetryBackendService(GenericGrpcService): listener for requests on Kafka topic. """ LOGGER.info('Telemetry backend request listener is running ...') - print ('Telemetry backend request listener is running ...') + # print ('Telemetry backend request listener is running ...') consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.REQUEST.value]) while True: @@ -66,13 +66,13 @@ class TelemetryBackendService(GenericGrpcService): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue else: - print("Consumer error: {}".format(receive_msg.error())) + # print("Consumer error: {}".format(receive_msg.error())) break try: collector = json.loads(receive_msg.value().decode('utf-8')) collector_id = receive_msg.key().decode('utf-8') LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) - print('Recevied Collector: {:} - {:}'.format(collector_id, collector)) + # print('Recevied Collector: {:} - {:}'.format(collector_id, collector)) if collector['duration'] == -1 and collector['interval'] == -1: self.TerminateCollectorBackend(collector_id) @@ -80,18 +80,19 @@ class TelemetryBackendService(GenericGrpcService): self.RunInitiateCollectorBackend(collector_id, collector) except Exception as e: LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e)) - print ("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e)) + # print ("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e)) def TerminateCollectorBackend(self, collector_id): if collector_id in self.running_threads: thread, stop_event = self.running_threads[collector_id] stop_event.set() thread.join() - print ("Terminating backend (by StopCollector): Collector Id: ", collector_id) + # print ("Terminating backend (by StopCollector): Collector Id: ", collector_id) del self.running_threads[collector_id] self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. else: - print ('Backend collector {:} not found'.format(collector_id)) + # print ('Backend collector {:} not found'.format(collector_id)) + LOGGER.warning('Backend collector {:} not found'.format(collector_id)) def RunInitiateCollectorBackend(self, collector_id: str, collector: str): stop_event = threading.Event() @@ -104,7 +105,8 @@ class TelemetryBackendService(GenericGrpcService): """ Method receives collector request and initiates collecter backend. """ - print("Initiating backend for collector: ", collector_id) + # print("Initiating backend for collector: ", collector_id) + LOGGER.info("Initiating backend for collector: ", collector_id) start_time = time.time() while not stop_event.is_set(): if int(collector['duration']) != -1 and time.time() - start_time >= collector['duration']: # condition to terminate backend @@ -136,8 +138,7 @@ class TelemetryBackendService(GenericGrpcService): Method to extract kpi value. """ measured_kpi_value = random.randint(1,100) # TODO: To be extracted from a device - print ("Measured Kpi value: {:}".format(measured_kpi_value)) - # measured_kpi_value = self.fetch_node_exporter_metrics() # exporter extracted metric value against default KPI + # print ("Measured Kpi value: {:}".format(measured_kpi_value)) self.GenerateCollectorResponse(collector_id, kpi_id , measured_kpi_value) def GenerateCollectorResponse(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): @@ -166,7 +167,7 @@ class TelemetryBackendService(GenericGrpcService): """ if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) - print(f'Message delivery failed: {err}') + # print(f'Message delivery failed: {err}') else: LOGGER.info('Message delivered to topic {:}'.format(msg.topic())) - print(f'Message delivered to topic {msg.topic()}') + # print(f'Message delivered to topic {msg.topic()}') diff --git a/src/telemetry/backend/tests/test_backend.py b/src/telemetry/backend/tests/test_backend.py index 3314477168bd0a2b20fc8d6dda5f82fb84016a32..8bbde9769ae1dfb16a33ef528f74031d2ba94c01 100644 --- a/src/telemetry/backend/tests/test_backend.py +++ b/src/telemetry/backend/tests/test_backend.py @@ -31,7 +31,7 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) -def test_RunRequestListener(): - LOGGER.info('test_RunRequestListener') - TelemetryBackendServiceObj = TelemetryBackendService() - threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start() \ No newline at end of file +# def test_RunRequestListener(): +# LOGGER.info('test_RunRequestListener') +# TelemetryBackendServiceObj = TelemetryBackendService() +# threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start() \ No newline at end of file diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index ad99dff12dc641232972f8cff8226878caefd71b..5c569e2ddd1d75dd89f88fe9ae08517330470254 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -153,7 +153,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): """ if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) - print('Message delivery failed: {:}'.format(err)) + # print('Message delivery failed: {:}'.format(err)) # else: # LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) # print('Message delivered to topic {:}'.format(msg.topic())) @@ -177,7 +177,8 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue else: - print("Consumer error: {}".format(receive_msg.error())) + # print("Consumer error: {:}".format(receive_msg.error())) + LOGGER.error("Consumer error: {:}".format(receive_msg.error())) break try: collector_id = receive_msg.key().decode('utf-8') @@ -185,13 +186,17 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): kpi_value = json.loads(receive_msg.value().decode('utf-8')) self.process_response(collector_id, kpi_value['kpi_id'], kpi_value['kpi_value']) else: - print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ") + # print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ") + LOGGER.info("collector id does not match.\nRespone ID: {:} --- Active IDs: {:}".format(collector_id, ACTIVE_COLLECTORS)) except Exception as e: - print(f"Error extarcting msg key or value: {str(e)}") + # print(f"Error extarcting msg key or value: {str(e)}") + LOGGER.info("Error extarcting msg key or value: {:}".format(e)) continue def process_response(self, collector_id: str, kpi_id: str, kpi_value: Any): if kpi_id == "-1" and kpi_value == -1: - print ("Backend termination confirmation for collector id: ", collector_id) + # print ("Backend termination confirmation for collector id: ", collector_id) + LOGGER.info("Backend termination confirmation for collector id: ", collector_id) else: - print ("KPI Value: Collector Id:", collector_id, ", Kpi Id:", kpi_id, ", Value:", kpi_value) + LOGGER.info("Backend termination confirmation for collector id: ", collector_id) + # print ("KPI Value: Collector Id:", collector_id, ", Kpi Id:", kpi_id, ", Value:", kpi_value)