diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 463442f82bd2c6a53786581e8bd89553c499edc8..1e0c8a15bd85213042838c0ad7f59e4c7cfceb3d 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -34,7 +34,7 @@ class AnalyticsBackendService(GenericGrpcService): 'group.id' : 'analytics-frontend', 'auto.offset.reset' : 'latest'}) - def StartSparkStreamer(self, analyzer_id, analyzer): + def StartSparkStreamer(self, analyzer_uuid, analyzer): kpi_list = analyzer['input_kpis'] oper_list = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())] # TODO: update this line... thresholds = analyzer['thresholds'] @@ -47,12 +47,12 @@ class AnalyticsBackendService(GenericGrpcService): try: stop_event = threading.Event() thread = threading.Thread(target=SparkStreamer, - args=(kpi_list, oper_list, thresholds, stop_event, + args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event, window_size, window_slider, None )) - self.running_threads[analyzer_id] = (thread, stop_event) + self.running_threads[analyzer_uuid] = (thread, stop_event) thread.start() - print ("Initiated Analyzer backend: {:}".format(analyzer_id)) - LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_id)) + print ("Initiated Analyzer backend: {:}".format(analyzer_uuid)) + LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid)) return True except Exception as e: print ("Failed to initiate Analyzer backend: {:}".format(e)) diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py index eaabcfed22d37053cc1289ab9a488e34369cbc92..96e1aa05d898ffdd23c533b74ee87fbf03f54576 100644 --- a/src/analytics/backend/service/SparkStreaming.py +++ b/src/analytics/backend/service/SparkStreaming.py @@ -74,7 +74,7 @@ def ApplyThresholds(aggregated_df, thresholds): ) return aggregated_df -def SparkStreamer(kpi_list, oper_list, thresholds, stop_event, +def SparkStreamer(key, kpi_list, oper_list, thresholds, stop_event, window_size=None, win_slide_duration=None, time_stamp_col=None): """ Method to perform Spark operation Kafka stream. @@ -128,7 +128,7 @@ def SparkStreamer(kpi_list, oper_list, thresholds, stop_event, # --- This will write output to Kafka: ACTUAL IMPLEMENTATION query = thresholded_stream_data \ - .selectExpr("CAST(kpi_id AS STRING) AS key", "to_json(struct(*)) AS value") \ + .selectExpr(f"'{key}' AS key", "to_json(struct(*)) AS value") \ .writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \ diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index c3e00df35a9c9e476afcd4c797e4875c94043443..2f40faba94ef7081db609116e8fd869e3d119a24 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -32,13 +32,14 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) -def test_StartRequestListener(): - LOGGER.info('test_RunRequestListener') - AnalyticsBackendServiceObj = AnalyticsBackendService() - response = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) - LOGGER.debug(str(response)) - assert isinstance(response, tuple) +# def test_StartRequestListener(): +# LOGGER.info('test_RunRequestListener') +# AnalyticsBackendServiceObj = AnalyticsBackendService() +# response = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) +# LOGGER.debug(str(response)) +# assert isinstance(response, tuple) +# To test START and STOP communication together def test_StopRequestListener(): LOGGER.info('test_RunRequestListener') LOGGER.info('Initiating StartRequestListener...') @@ -52,11 +53,12 @@ def test_StopRequestListener(): LOGGER.debug(str(response)) assert isinstance(response, bool) -def test_SparkListener(): - LOGGER.info('test_RunRequestListener') - AnalyticsBackendServiceObj = AnalyticsBackendService() - response = AnalyticsBackendServiceObj.RunSparkStreamer( - get_kpi_id_list(), get_operation_list(), get_threshold_dict() - ) - LOGGER.debug(str(response)) - assert isinstance(response, bool) +# To independently tests the SparkListener functionality +# def test_SparkListener(): +# LOGGER.info('test_RunRequestListener') +# AnalyticsBackendServiceObj = AnalyticsBackendService() +# response = AnalyticsBackendServiceObj.RunSparkStreamer( +# get_kpi_id_list(), get_operation_list(), get_threshold_dict() +# ) +# LOGGER.debug(str(response)) +# assert isinstance(response, bool) diff --git a/src/analytics/frontend/requirements.in b/src/analytics/frontend/requirements.in index 1d22df11b4032c05ba851e9d64e5ca3786ecc461..6bf3d7c266e6006deb1fb88289b0b187abd677d6 100644 --- a/src/analytics/frontend/requirements.in +++ b/src/analytics/frontend/requirements.in @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +apscheduler==3.10.* # .4 confluent-kafka==2.3.* psycopg2-binary==2.9.* SQLAlchemy==1.4.* diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index f35f035e2fc004f0553c0a471bfcb8cd3d7699be..8bb6a17afb5b911e3652fdb8d1853b5b7bc6faf3 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -13,7 +13,7 @@ # limitations under the License. -import logging, grpc, json +import logging, grpc, json, queue from typing import Dict from confluent_kafka import Consumer as KafkaConsumer @@ -27,22 +27,24 @@ from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, Analy from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer from analytics.database.Analyzer_DB import AnalyzerDB from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel - +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.interval import IntervalTrigger LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC') -ACTIVE_ANALYZERS = [] # In case of sevice restarts, the list can be populated from the DB. class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): def __init__(self): LOGGER.info('Init AnalyticsFrontendService') + self.listener_topic = KafkaTopic.ANALYTICS_RESPONSE.value self.db_obj = AnalyzerDB() + self.result_queue = queue.Queue() + self.scheduler = BackgroundScheduler() self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'analytics-frontend', 'auto.offset.reset' : 'latest'}) - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StartAnalyzer(self, request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore @@ -80,9 +82,64 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): callback = self.delivery_callback ) LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate)) - ACTIVE_ANALYZERS.append(analyzer_uuid) self.kafka_producer.flush() + + # self.StartResponseListener(analyzer_uuid) + def StartResponseListener(self, filter_key=None): + """ + Start the Kafka response listener with APScheduler and return key-value pairs periodically. + """ + LOGGER.info("Starting StartResponseListener") + # Schedule the ResponseListener at fixed intervals + self.scheduler.add_job( + self.response_listener, + trigger=IntervalTrigger(seconds=5), + args=[filter_key], + id=f"response_listener_{self.listener_topic}", + replace_existing=True + ) + self.scheduler.start() + LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...") + try: + while True: + LOGGER.info("entering while...") + key, value = self.result_queue.get() # Wait until a result is available + LOGGER.info("In while true ...") + yield key, value # Yield the result to the calling function + except KeyboardInterrupt: + LOGGER.warning("Listener stopped manually.") + finally: + self.StopListener() + + def response_listener(self, filter_key=None): + """ + Poll Kafka messages and put key-value pairs into the queue. + """ + LOGGER.info(f"Polling Kafka topic {self.listener_topic}...") + + consumer = self.kafka_consumer + consumer.subscribe([self.listener_topic]) + msg = consumer.poll(2.0) + if msg is None: + return + elif msg.error(): + if msg.error().code() != KafkaError._PARTITION_EOF: + LOGGER.error(f"Kafka error: {msg.error()}") + return + + try: + key = msg.key().decode('utf-8') if msg.key() else None + if filter_key is not None and key == filter_key: + value = json.loads(msg.value().decode('utf-8')) + LOGGER.info(f"Received key: {key}, value: {value}") + self.result_queue.put((key, value)) + else: + LOGGER.info(f"Skipping message with unmatched key: {key}") + # value = json.loads(msg.value().decode('utf-8')) # Added for debugging + # self.result_queue.put((filter_key, value)) # Added for debugging + except Exception as e: + LOGGER.error(f"Error processing Kafka message: {e}") @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StopAnalyzer(self, @@ -118,11 +175,15 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ) LOGGER.info("Analyzer Stop Request Generated: Analyzer Id: {:}".format(analyzer_uuid)) self.kafka_producer.flush() - try: - ACTIVE_ANALYZERS.remove(analyzer_uuid) - except ValueError: - LOGGER.warning('Analyzer ID {:} not found in active analyzers'.format(analyzer_uuid)) + self.StopListener() + def StopListener(self): + """ + Gracefully stop the Kafka listener and the scheduler. + """ + LOGGER.info("Stopping Kafka listener...") + self.scheduler.shutdown() + LOGGER.info("Kafka listener stopped.") @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectAnalyzers(self, @@ -147,7 +208,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): def delivery_callback(self, err, msg): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) - print('Message delivery failed: {:}'.format(err)) + print ('Message delivery failed: {:}'.format(err)) # else: # LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) # print('Message delivered to topic {:}'.format(msg.topic())) diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py index b96116d29a16a8c327242a50cd6d8dabd106bfeb..d2428c01fb021f71a884d9a99c446bfef6e66559 100644 --- a/src/analytics/frontend/tests/test_frontend.py +++ b/src/analytics/frontend/tests/test_frontend.py @@ -13,8 +13,11 @@ # limitations under the License. import os +import time +import json import pytest import logging +import threading from common.Constants import ServiceNameEnum from common.proto.context_pb2 import Empty @@ -27,6 +30,10 @@ from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFronten from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer, create_analyzer_filter ) +from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.interval import IntervalTrigger + ########################### # Tests Setup @@ -83,20 +90,45 @@ def test_validate_kafka_topics(): assert isinstance(response, bool) # ----- core funtionality test ----- -def test_StartAnalytics(analyticsFrontend_client): - LOGGER.info(' >>> test_StartAnalytic START: <<< ') - response = analyticsFrontend_client.StartAnalyzer(create_analyzer()) - LOGGER.debug(str(response)) - assert isinstance(response, AnalyzerId) - -def test_SelectAnalytics(analyticsFrontend_client): - LOGGER.info(' >>> test_SelectAnalytics START: <<< ') - response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter()) +# def test_StartAnalytics(analyticsFrontend_client): +# LOGGER.info(' >>> test_StartAnalytic START: <<< ') +# response = analyticsFrontend_client.StartAnalyzer(create_analyzer()) +# LOGGER.debug(str(response)) +# assert isinstance(response, AnalyzerId) + +# To test start and stop listener together +def test_StartStopAnalyzers(analyticsFrontend_client): + LOGGER.info(' >>> test_StartStopAnalyzers START: <<< ') + LOGGER.info('--> StartAnalyzer') + added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer()) + LOGGER.debug(str(added_analyzer_id)) + LOGGER.info(' --> Calling StartResponseListener... ') + class_obj = AnalyticsFrontendServiceServicerImpl() + response = class_obj.StartResponseListener(added_analyzer_id.analyzer_id._uuid) + LOGGER.debug(response) + LOGGER.info("waiting for timer to comlete ...") + time.sleep(3) + LOGGER.info('--> StopAnalyzer') + response = analyticsFrontend_client.StopAnalyzer(added_analyzer_id) LOGGER.debug(str(response)) - assert isinstance(response, AnalyzerList) -def test_StopAnalytic(analyticsFrontend_client): - LOGGER.info(' >>> test_StopAnalytic START: <<< ') - response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id()) - LOGGER.debug(str(response)) - assert isinstance(response, Empty) +# def test_SelectAnalytics(analyticsFrontend_client): +# LOGGER.info(' >>> test_SelectAnalytics START: <<< ') +# response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter()) +# LOGGER.debug(str(response)) +# assert isinstance(response, AnalyzerList) + +# def test_StopAnalytic(analyticsFrontend_client): +# LOGGER.info(' >>> test_StopAnalytic START: <<< ') +# response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id()) +# LOGGER.debug(str(response)) +# assert isinstance(response, Empty) + +# def test_ResponseListener(): +# LOGGER.info(' >>> test_ResponseListener START <<< ') +# analyzer_id = create_analyzer_id() +# LOGGER.debug("Starting Response Listener for Analyzer ID: {:}".format(analyzer_id.analyzer_id.uuid)) +# class_obj = AnalyticsFrontendServiceServicerImpl() +# for response in class_obj.StartResponseListener(analyzer_id.analyzer_id.uuid): +# LOGGER.debug(response) +# assert isinstance(response, tuple) \ No newline at end of file