Skip to content
Snippets Groups Projects
Commit 88caa1cf authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes in Analytics

**Backend:**
- Added a dictionary to manage running analyzers.
- Implemented logic to manage running analyzers.
- Added the `TerminateAnalyzerBackend` method to handle analyzer termination.

**Frontend:**
- Modified and invoked the `PublishStopRequestOnKafka` method.
parent 88d7a200
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!261(CTTC) New Analytics Component
......@@ -29,13 +29,14 @@ class AnalyticsBackendService(GenericGrpcService):
Class listens for ...
"""
def __init__(self, cls_name : str = __name__) -> None:
self.running_threads = {} # To keep track of all running analyzers
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'})
def RunSparkStreamer(self, analyzer):
def RunSparkStreamer(self, analyzer_id, analyzer):
kpi_list = analyzer['input_kpis']
oper_list = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())]
oper_list = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())] # TODO: update this line...
thresholds = analyzer['thresholds']
window_size = analyzer['window_size']
window_slider = analyzer['window_slider']
......@@ -43,10 +44,19 @@ class AnalyticsBackendService(GenericGrpcService):
kpi_list, oper_list, thresholds, window_size, window_slider))
LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
kpi_list, oper_list, thresholds, window_size, window_slider))
threading.Thread(target=SparkStreamer,
args=(kpi_list, oper_list, thresholds, window_size, window_slider, None)
).start()
return True
try:
stop_event = threading.Event()
thread = threading.Thread(target=SparkStreamer,
args=(kpi_list, oper_list, thresholds, window_size, window_slider, None,
stop_event))
self.running_threads[analyzer_id] = (thread, stop_event)
# thread.start()
LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_id))
return True
except Exception as e:
print ("Failed to initiate Analyzer backend: {:}".format(e))
LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e))
return False
def RunRequestListener(self)->bool:
threading.Thread(target=self.RequestListener).start()
......@@ -69,8 +79,30 @@ class AnalyticsBackendService(GenericGrpcService):
print("Consumer error: {}".format(receive_msg.error()))
break
analyzer = json.loads(receive_msg.value().decode('utf-8'))
analyzer_id = receive_msg.key().decode('utf-8')
LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_id, analyzer))
print('Recevied Analyzer: {:} - {:}'.format(analyzer_id, analyzer))
# TODO: Add active analyzer to list
self.RunSparkStreamer(analyzer)
analyzer_uuid = receive_msg.key().decode('utf-8')
LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
if analyzer["algo_name"] is None and analyzer["oper_mode"] is None:
self.TerminateAnalyzerBackend(analyzer_uuid)
else:
self.RunSparkStreamer(analyzer_uuid, analyzer)
def TerminateAnalyzerBackend(self, analyzer_uuid):
if analyzer_uuid in self.running_threads:
try:
thread, stop_event = self.running_threads[analyzer_uuid]
stop_event.set()
thread.join()
del self.running_threads[analyzer_uuid]
print ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid))
return True
except Exception as e:
LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: ".format(analyzer_uuid, e))
return False
else:
print ("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))
# LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid))
# generate confirmation towards frontend
......@@ -94,21 +94,21 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
self.db_obj.delete_db_row_by_id(
AnalyzerModel, "analyzer_id", analyzer_id_to_delete
)
self.PublishStopRequestOnKafka(analyzer_id_to_delete)
except Exception as e:
LOGGER.warning('Unable to delete analyzer. Error: {:}'.format(e))
self.PublishStopRequestOnKafka(request)
LOGGER.error('Unable to delete analyzer. Error: {:}'.format(e))
return Empty()
def PublishStopRequestOnKafka(self, analyzer_id):
def PublishStopRequestOnKafka(self, analyzer_uuid):
"""
Method to generate stop analyzer request on Kafka.
"""
analyzer_uuid = analyzer_id.analyzer_id.uuid
# analyzer_uuid = analyzer_id.analyzer_id.uuid
analyzer_to_stop : Dict = {
"algo_name" : -1,
"algo_name" : None,
"input_kpis" : [],
"output_kpis" : [],
"oper_mode" : -1
"oper_mode" : None
}
self.kafka_producer.produce(
KafkaTopic.ANALYTICS_REQUEST.value,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment