Skip to content
Snippets Groups Projects
Commit c1ee28db authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes in Analytics

Backend:
- Updated the `StartSparkStreamer` function call to send `Analyzer_uuid` as the key for messages produced on the Kafka topic.
- Updated the `SparkStream` definition to receive the key and added the key to the `streamwriter` object.

Frontend:
- Integrated APScheduler to manage `StreamListener`.
- Added `ResponseListener` to consume messages from the `analytics_response_topic` and process them.
- Added APScheduler to manage StreamListener.
- Added "ResponseListener" to consumer messages from analytics response_topic and process it.
parent 8ac130c1
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!261(CTTC) New Analytics Component
......@@ -34,7 +34,7 @@ class AnalyticsBackendService(GenericGrpcService):
'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'})
def StartSparkStreamer(self, analyzer_id, analyzer):
def StartSparkStreamer(self, analyzer_uuid, analyzer):
kpi_list = analyzer['input_kpis']
oper_list = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())] # TODO: update this line...
thresholds = analyzer['thresholds']
......@@ -47,12 +47,12 @@ class AnalyticsBackendService(GenericGrpcService):
try:
stop_event = threading.Event()
thread = threading.Thread(target=SparkStreamer,
args=(kpi_list, oper_list, thresholds, stop_event,
args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event,
window_size, window_slider, None ))
self.running_threads[analyzer_id] = (thread, stop_event)
self.running_threads[analyzer_uuid] = (thread, stop_event)
thread.start()
print ("Initiated Analyzer backend: {:}".format(analyzer_id))
LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_id))
print ("Initiated Analyzer backend: {:}".format(analyzer_uuid))
LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid))
return True
except Exception as e:
print ("Failed to initiate Analyzer backend: {:}".format(e))
......
......@@ -74,7 +74,7 @@ def ApplyThresholds(aggregated_df, thresholds):
)
return aggregated_df
def SparkStreamer(kpi_list, oper_list, thresholds, stop_event,
def SparkStreamer(key, kpi_list, oper_list, thresholds, stop_event,
window_size=None, win_slide_duration=None, time_stamp_col=None):
"""
Method to perform Spark operation Kafka stream.
......@@ -128,7 +128,7 @@ def SparkStreamer(kpi_list, oper_list, thresholds, stop_event,
# --- This will write output to Kafka: ACTUAL IMPLEMENTATION
query = thresholded_stream_data \
.selectExpr("CAST(kpi_id AS STRING) AS key", "to_json(struct(*)) AS value") \
.selectExpr(f"'{key}' AS key", "to_json(struct(*)) AS value") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \
......
......@@ -32,13 +32,14 @@ def test_validate_kafka_topics():
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
def test_StartRequestListener():
LOGGER.info('test_RunRequestListener')
AnalyticsBackendServiceObj = AnalyticsBackendService()
response = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
LOGGER.debug(str(response))
assert isinstance(response, tuple)
# def test_StartRequestListener():
# LOGGER.info('test_RunRequestListener')
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event)
# LOGGER.debug(str(response))
# assert isinstance(response, tuple)
# To test START and STOP communication together
def test_StopRequestListener():
LOGGER.info('test_RunRequestListener')
LOGGER.info('Initiating StartRequestListener...')
......@@ -52,11 +53,12 @@ def test_StopRequestListener():
LOGGER.debug(str(response))
assert isinstance(response, bool)
def test_SparkListener():
LOGGER.info('test_RunRequestListener')
AnalyticsBackendServiceObj = AnalyticsBackendService()
response = AnalyticsBackendServiceObj.RunSparkStreamer(
get_kpi_id_list(), get_operation_list(), get_threshold_dict()
)
LOGGER.debug(str(response))
assert isinstance(response, bool)
# To independently tests the SparkListener functionality
# def test_SparkListener():
# LOGGER.info('test_RunRequestListener')
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.RunSparkStreamer(
# get_kpi_id_list(), get_operation_list(), get_threshold_dict()
# )
# LOGGER.debug(str(response))
# assert isinstance(response, bool)
......@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
apscheduler==3.10.* # .4
confluent-kafka==2.3.*
psycopg2-binary==2.9.*
SQLAlchemy==1.4.*
......
......@@ -13,7 +13,7 @@
# limitations under the License.
import logging, grpc, json
import logging, grpc, json, queue
from typing import Dict
from confluent_kafka import Consumer as KafkaConsumer
......@@ -27,22 +27,24 @@ from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, Analy
from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer
from analytics.database.Analyzer_DB import AnalyzerDB
from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC')
ACTIVE_ANALYZERS = [] # In case of sevice restarts, the list can be populated from the DB.
class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
def __init__(self):
LOGGER.info('Init AnalyticsFrontendService')
self.listener_topic = KafkaTopic.ANALYTICS_RESPONSE.value
self.db_obj = AnalyzerDB()
self.result_queue = queue.Queue()
self.scheduler = BackgroundScheduler()
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'})
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StartAnalyzer(self,
request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore
......@@ -80,9 +82,64 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
callback = self.delivery_callback
)
LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate))
ACTIVE_ANALYZERS.append(analyzer_uuid)
self.kafka_producer.flush()
# self.StartResponseListener(analyzer_uuid)
def StartResponseListener(self, filter_key=None):
"""
Start the Kafka response listener with APScheduler and return key-value pairs periodically.
"""
LOGGER.info("Starting StartResponseListener")
# Schedule the ResponseListener at fixed intervals
self.scheduler.add_job(
self.response_listener,
trigger=IntervalTrigger(seconds=5),
args=[filter_key],
id=f"response_listener_{self.listener_topic}",
replace_existing=True
)
self.scheduler.start()
LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...")
try:
while True:
LOGGER.info("entering while...")
key, value = self.result_queue.get() # Wait until a result is available
LOGGER.info("In while true ...")
yield key, value # Yield the result to the calling function
except KeyboardInterrupt:
LOGGER.warning("Listener stopped manually.")
finally:
self.StopListener()
def response_listener(self, filter_key=None):
"""
Poll Kafka messages and put key-value pairs into the queue.
"""
LOGGER.info(f"Polling Kafka topic {self.listener_topic}...")
consumer = self.kafka_consumer
consumer.subscribe([self.listener_topic])
msg = consumer.poll(2.0)
if msg is None:
return
elif msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
LOGGER.error(f"Kafka error: {msg.error()}")
return
try:
key = msg.key().decode('utf-8') if msg.key() else None
if filter_key is not None and key == filter_key:
value = json.loads(msg.value().decode('utf-8'))
LOGGER.info(f"Received key: {key}, value: {value}")
self.result_queue.put((key, value))
else:
LOGGER.info(f"Skipping message with unmatched key: {key}")
# value = json.loads(msg.value().decode('utf-8')) # Added for debugging
# self.result_queue.put((filter_key, value)) # Added for debugging
except Exception as e:
LOGGER.error(f"Error processing Kafka message: {e}")
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StopAnalyzer(self,
......@@ -118,11 +175,15 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
)
LOGGER.info("Analyzer Stop Request Generated: Analyzer Id: {:}".format(analyzer_uuid))
self.kafka_producer.flush()
try:
ACTIVE_ANALYZERS.remove(analyzer_uuid)
except ValueError:
LOGGER.warning('Analyzer ID {:} not found in active analyzers'.format(analyzer_uuid))
self.StopListener()
def StopListener(self):
"""
Gracefully stop the Kafka listener and the scheduler.
"""
LOGGER.info("Stopping Kafka listener...")
self.scheduler.shutdown()
LOGGER.info("Kafka listener stopped.")
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SelectAnalyzers(self,
......@@ -147,7 +208,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
def delivery_callback(self, err, msg):
if err:
LOGGER.debug('Message delivery failed: {:}'.format(err))
print('Message delivery failed: {:}'.format(err))
print ('Message delivery failed: {:}'.format(err))
# else:
# LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
# print('Message delivered to topic {:}'.format(msg.topic()))
......@@ -13,8 +13,11 @@
# limitations under the License.
import os
import time
import json
import pytest
import logging
import threading
from common.Constants import ServiceNameEnum
from common.proto.context_pb2 import Empty
......@@ -27,6 +30,10 @@ from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFronten
from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService
from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer,
create_analyzer_filter )
from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
###########################
# Tests Setup
......@@ -83,20 +90,45 @@ def test_validate_kafka_topics():
assert isinstance(response, bool)
# ----- core funtionality test -----
def test_StartAnalytics(analyticsFrontend_client):
LOGGER.info(' >>> test_StartAnalytic START: <<< ')
response = analyticsFrontend_client.StartAnalyzer(create_analyzer())
LOGGER.debug(str(response))
assert isinstance(response, AnalyzerId)
def test_SelectAnalytics(analyticsFrontend_client):
LOGGER.info(' >>> test_SelectAnalytics START: <<< ')
response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter())
# def test_StartAnalytics(analyticsFrontend_client):
# LOGGER.info(' >>> test_StartAnalytic START: <<< ')
# response = analyticsFrontend_client.StartAnalyzer(create_analyzer())
# LOGGER.debug(str(response))
# assert isinstance(response, AnalyzerId)
# To test start and stop listener together
def test_StartStopAnalyzers(analyticsFrontend_client):
LOGGER.info(' >>> test_StartStopAnalyzers START: <<< ')
LOGGER.info('--> StartAnalyzer')
added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer())
LOGGER.debug(str(added_analyzer_id))
LOGGER.info(' --> Calling StartResponseListener... ')
class_obj = AnalyticsFrontendServiceServicerImpl()
response = class_obj.StartResponseListener(added_analyzer_id.analyzer_id._uuid)
LOGGER.debug(response)
LOGGER.info("waiting for timer to comlete ...")
time.sleep(3)
LOGGER.info('--> StopAnalyzer')
response = analyticsFrontend_client.StopAnalyzer(added_analyzer_id)
LOGGER.debug(str(response))
assert isinstance(response, AnalyzerList)
def test_StopAnalytic(analyticsFrontend_client):
LOGGER.info(' >>> test_StopAnalytic START: <<< ')
response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id())
LOGGER.debug(str(response))
assert isinstance(response, Empty)
# def test_SelectAnalytics(analyticsFrontend_client):
# LOGGER.info(' >>> test_SelectAnalytics START: <<< ')
# response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter())
# LOGGER.debug(str(response))
# assert isinstance(response, AnalyzerList)
# def test_StopAnalytic(analyticsFrontend_client):
# LOGGER.info(' >>> test_StopAnalytic START: <<< ')
# response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id())
# LOGGER.debug(str(response))
# assert isinstance(response, Empty)
# def test_ResponseListener():
# LOGGER.info(' >>> test_ResponseListener START <<< ')
# analyzer_id = create_analyzer_id()
# LOGGER.debug("Starting Response Listener for Analyzer ID: {:}".format(analyzer_id.analyzer_id.uuid))
# class_obj = AnalyticsFrontendServiceServicerImpl()
# for response in class_obj.StartResponseListener(analyzer_id.analyzer_id.uuid):
# LOGGER.debug(response)
# assert isinstance(response, tuple)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment