Skip to content
Snippets Groups Projects
Commit 821e14f8 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes to Analytics Backend:

- Corrected the name of `install_servicers()`.
- Removed `stop_event` from the `RequestedListener` call.
- Spark Streamer now writes to the `ALARMS` topic.
- Improved messages and test files.
parent c0059504
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!266Resolve: "Unable to correctly extract the aggregation function names from the dictionary received as parameters in the Analyzer message"
...@@ -78,20 +78,17 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -78,20 +78,17 @@ class AnalyticsBackendService(GenericGrpcService):
LOGGER.error("Failed to terminate analytics backend {:}".format(e)) LOGGER.error("Failed to terminate analytics backend {:}".format(e))
return False return False
def install_services(self): def install_servicers(self):
stop_event = threading.Event() threading.Thread(target=self.RequestListener, args=())
thread = threading.Thread(target=self.RequestListener,
args=(stop_event,) )
thread.start()
return (thread, stop_event)
def RequestListener(self, stop_event): def RequestListener(self):
""" """
listener for requests on Kafka topic. listener for requests on Kafka topic.
""" """
LOGGER.info("Request Listener is initiated ...")
consumer = self.kafka_consumer consumer = self.kafka_consumer
consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value]) consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
while not stop_event.is_set(): while True:
receive_msg = consumer.poll(2.0) receive_msg = consumer.poll(2.0)
if receive_msg is None: if receive_msg is None:
continue continue
...@@ -101,7 +98,7 @@ class AnalyticsBackendService(GenericGrpcService): ...@@ -101,7 +98,7 @@ class AnalyticsBackendService(GenericGrpcService):
else: else:
print("Consumer error: {}".format(receive_msg.error())) print("Consumer error: {}".format(receive_msg.error()))
break break
analyzer = json.loads(receive_msg.value().decode('utf-8')) analyzer = json.loads(receive_msg.value().decode('utf-8'))
analyzer_uuid = receive_msg.key().decode('utf-8') analyzer_uuid = receive_msg.key().decode('utf-8')
LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer))
......
...@@ -33,7 +33,7 @@ def SettingKafkaConsumerParams(): # TODO: create get_kafka_consumer() in comm ...@@ -33,7 +33,7 @@ def SettingKafkaConsumerParams(): # TODO: create get_kafka_consumer() in comm
return { return {
# "kafka.bootstrap.servers": '127.0.0.1:9092', # "kafka.bootstrap.servers": '127.0.0.1:9092',
"kafka.bootstrap.servers": KafkaConfig.get_kafka_address(), "kafka.bootstrap.servers": KafkaConfig.get_kafka_address(),
"subscribe" : KafkaTopic.VALUE.value, "subscribe" : KafkaTopic.VALUE.value, # topic should have atleast one message before spark session
"startingOffsets" : 'latest', "startingOffsets" : 'latest',
"failOnDataLoss" : 'false' # Optional: Set to "true" to fail the query on data loss "failOnDataLoss" : 'false' # Optional: Set to "true" to fail the query on data loss
} }
...@@ -132,7 +132,7 @@ def SparkStreamer(key, kpi_list, oper_list, thresholds, stop_event, ...@@ -132,7 +132,7 @@ def SparkStreamer(key, kpi_list, oper_list, thresholds, stop_event,
.writeStream \ .writeStream \
.format("kafka") \ .format("kafka") \
.option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \ .option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \
.option("topic", KafkaTopic.ANALYTICS_RESPONSE.value) \ .option("topic", KafkaTopic.ALARMS.value) \
.option("checkpointLocation", "analytics/.spark/checkpoint") \ .option("checkpointLocation", "analytics/.spark/checkpoint") \
.outputMode("update") .outputMode("update")
......
...@@ -12,6 +12,11 @@ ...@@ -12,6 +12,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import uuid
import json
from common.proto.kpi_manager_pb2 import KpiId
from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode,
Analyzer )
def get_kpi_id_list(): def get_kpi_id_list():
return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"] return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"]
...@@ -32,3 +37,37 @@ def get_threshold_dict(): ...@@ -32,3 +37,37 @@ def get_threshold_dict():
return { return {
op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict
} }
def create_analyzer():
_create_analyzer = Analyzer()
# _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
_create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
_create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold"
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
_kpi_id = KpiId()
# input IDs to analyze
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
_create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
_create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.input_kpi_ids.append(_kpi_id)
# output IDs after analysis
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id)
# parameter
_threshold_dict = {
# 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50),
'first_value' :(00, 10), 'last_value' :(40, 50), 'stdev_value':(00, 10)}
_create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
_create_analyzer.parameters['window_size'] = "60 seconds" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks"
_create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size
_create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet
return _create_analyzer
\ No newline at end of file
...@@ -12,12 +12,14 @@ ...@@ -12,12 +12,14 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import time import time, json
from typing import Dict
import logging import logging
import threading import threading
from common.tools.kafka.Variables import KafkaTopic from common.tools.kafka.Variables import KafkaTopic
from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService
from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict
from .messages import create_analyzer
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -32,6 +34,24 @@ def test_validate_kafka_topics(): ...@@ -32,6 +34,24 @@ def test_validate_kafka_topics():
response = KafkaTopic.create_all_topics() response = KafkaTopic.create_all_topics()
assert isinstance(response, bool) assert isinstance(response, bool)
def test_StartSparkStreamer():
LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ")
analyzer_obj = create_analyzer()
analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid
analyzer_to_generate : Dict = {
"algo_name" : analyzer_obj.algorithm_name,
"input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids],
"output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids],
"oper_mode" : analyzer_obj.operation_mode,
"thresholds" : json.loads(analyzer_obj.parameters["thresholds"]),
"window_size" : analyzer_obj.parameters["window_size"],
"window_slider" : analyzer_obj.parameters["window_slider"],
# "store_aggregate" : analyzer_obj.parameters["store_aggregate"]
}
AnalyticsBackendServiceObj = AnalyticsBackendService()
response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate)
assert isinstance(response, bool)
# def test_StartRequestListener(): # def test_StartRequestListener():
# LOGGER.info('test_RunRequestListener') # LOGGER.info('test_RunRequestListener')
# AnalyticsBackendServiceObj = AnalyticsBackendService() # AnalyticsBackendServiceObj = AnalyticsBackendService()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment