Skip to content
Snippets Groups Projects
Commit 88d7a200 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes in Analytics Service.

- Thresholds, window_size, window_slider is added in Frontend and Backend.
parent 5eed4743
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!261(CTTC) New Analytics Component
...@@ -22,7 +22,7 @@ from common.tools.kafka.Variables import KafkaConfig, KafkaTopic ...@@ -22,7 +22,7 @@ from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaError from confluent_kafka import KafkaError
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class AnalyticsBackendService(GenericGrpcService): class AnalyticsBackendService(GenericGrpcService):
""" """
...@@ -33,11 +33,18 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -33,11 +33,18 @@ class AnalyticsBackendService(GenericGrpcService):
'group.id' : 'analytics-frontend', 'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'}) 'auto.offset.reset' : 'latest'})
def RunSparkStreamer(self, kpi_list, oper_list, thresholds_dict): def RunSparkStreamer(self, analyzer):
print ("Received parameters: {:} - {:} - {:}".format(kpi_list, oper_list, thresholds_dict)) kpi_list = analyzer['input_kpis']
LOGGER.debug ("Received parameters: {:} - {:} - {:}".format(kpi_list, oper_list, thresholds_dict)) oper_list = [s.replace('_value', '') for s in list(analyzer["thresholds"].keys())]
thresholds = analyzer['thresholds']
window_size = analyzer['window_size']
window_slider = analyzer['window_slider']
print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
kpi_list, oper_list, thresholds, window_size, window_slider))
LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format(
kpi_list, oper_list, thresholds, window_size, window_slider))
threading.Thread(target=SparkStreamer, threading.Thread(target=SparkStreamer,
args=(kpi_list, oper_list, None, None, thresholds_dict, None) args=(kpi_list, oper_list, thresholds, window_size, window_slider, None)
).start() ).start()
return True return True
...@@ -63,6 +70,7 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -63,6 +70,7 @@ class AnalyticsBackendService(GenericGrpcService):
break break
analyzer = json.loads(receive_msg.value().decode('utf-8')) analyzer = json.loads(receive_msg.value().decode('utf-8'))
analyzer_id = receive_msg.key().decode('utf-8') analyzer_id = receive_msg.key().decode('utf-8')
LOGGER.debug('Recevied Collector: {:} - {:}'.format(analyzer_id, analyzer)) LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_id, analyzer))
print('Recevied Collector: {:} - {:} - {:}'.format(analyzer_id, analyzer, analyzer['input_kpis'])) print('Recevied Analyzer: {:} - {:}'.format(analyzer_id, analyzer))
self.RunSparkStreamer(analyzer['input_kpis']) # TODO: Add active analyzer to list # TODO: Add active analyzer to list
self.RunSparkStreamer(analyzer)
...@@ -73,7 +73,8 @@ def ApplyThresholds(aggregated_df, thresholds): ...@@ -73,7 +73,8 @@ def ApplyThresholds(aggregated_df, thresholds):
) )
return aggregated_df return aggregated_df
def SparkStreamer(kpi_list, oper_list, thresholds, window_size=None, win_slide_duration=None, time_stamp_col=None): def SparkStreamer(kpi_list, oper_list, thresholds,
window_size=None, win_slide_duration=None, time_stamp_col=None):
""" """
Method to perform Spark operation Kafka stream. Method to perform Spark operation Kafka stream.
NOTE: Kafka topic to be processesd should have atleast one row before initiating the spark session. NOTE: Kafka topic to be processesd should have atleast one row before initiating the spark session.
......
...@@ -26,7 +26,7 @@ def get_threshold_dict(): ...@@ -26,7 +26,7 @@ def get_threshold_dict():
'max_value' : (45, 50), 'max_value' : (45, 50),
'first_value' : (00, 10), 'first_value' : (00, 10),
'last_value' : (40, 50), 'last_value' : (40, 50),
'stddev_value' : (00, 10), 'stdev_value' : (00, 10),
} }
# Filter threshold_dict based on the operation_list # Filter threshold_dict based on the operation_list
return { return {
......
...@@ -64,10 +64,14 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -64,10 +64,14 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
""" """
analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
analyzer_to_generate : Dict = { analyzer_to_generate : Dict = {
"algo_name" : analyzer_obj.algorithm_name, "algo_name" : analyzer_obj.algorithm_name,
"input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
"output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
"oper_mode" : analyzer_obj.operation_mode "oper_mode" : analyzer_obj.operation_mode,
"thresholds" : json.loads(analyzer_obj.parameters["thresholds"]),
"window_size" : analyzer_obj.parameters["window_size"],
"window_slider" : analyzer_obj.parameters["window_slider"],
# "store_aggregate" : analyzer_obj.parameters["store_aggregate"]
} }
self.kafka_producer.produce( self.kafka_producer.produce(
KafkaTopic.ANALYTICS_REQUEST.value, KafkaTopic.ANALYTICS_REQUEST.value,
......
...@@ -33,10 +33,10 @@ def create_analyzer(): ...@@ -33,10 +33,10 @@ def create_analyzer():
_kpi_id = KpiId() _kpi_id = KpiId()
# input IDs to analyze # input IDs to analyze
_kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
_create_analyzer.input_kpi_ids.append(_kpi_id) _create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
_create_analyzer.input_kpi_ids.append(_kpi_id) _create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.input_kpi_ids.append(_kpi_id) _create_analyzer.input_kpi_ids.append(_kpi_id)
...@@ -47,8 +47,8 @@ def create_analyzer(): ...@@ -47,8 +47,8 @@ def create_analyzer():
_create_analyzer.output_kpi_ids.append(_kpi_id) _create_analyzer.output_kpi_ids.append(_kpi_id)
# parameter # parameter
_threshold_dict = { _threshold_dict = {
'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), # 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50),
'first_value' :(00, 10), 'last_value' :(40, 50), 'stddev_value':(00, 10)} 'first_value' :(00, 10), 'last_value' :(40, 50), 'stdev_value':(00, 10)}
_create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
_create_analyzer.parameters['window_size'] = "60 seconds" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" _create_analyzer.parameters['window_size'] = "60 seconds" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks"
_create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size _create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment