diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index d4a53745956f8007c7f7ad0859e44f046e5fe989..4af90cf17cf225ae77e5c31a6e39fa4d8ec3e99a 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -78,20 +78,17 @@ class AnalyticsBackendService(GenericGrpcService): LOGGER.error("Failed to terminate analytics backend {:}".format(e)) return False - def install_services(self): - stop_event = threading.Event() - thread = threading.Thread(target=self.RequestListener, - args=(stop_event,) ) - thread.start() - return (thread, stop_event) + def install_servicers(self): + threading.Thread(target=self.RequestListener, args=()) - def RequestListener(self, stop_event): + def RequestListener(self): """ listener for requests on Kafka topic. """ + LOGGER.info("Request Listener is initiated ...") consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value]) - while not stop_event.is_set(): + while True: receive_msg = consumer.poll(2.0) if receive_msg is None: continue @@ -101,7 +98,7 @@ class AnalyticsBackendService(GenericGrpcService): else: print("Consumer error: {}".format(receive_msg.error())) break - analyzer = json.loads(receive_msg.value().decode('utf-8')) + analyzer = json.loads(receive_msg.value().decode('utf-8')) analyzer_uuid = receive_msg.key().decode('utf-8') LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py index 96e1aa05d898ffdd23c533b74ee87fbf03f54576..cebfeb829937ed0c88de33ccc2f949d7f72d7f91 100644 --- a/src/analytics/backend/service/SparkStreaming.py +++ b/src/analytics/backend/service/SparkStreaming.py @@ -33,7 +33,7 @@ def SettingKafkaConsumerParams(): # TODO: create get_kafka_consumer() in comm return { # "kafka.bootstrap.servers": '127.0.0.1:9092', "kafka.bootstrap.servers": KafkaConfig.get_kafka_address(), - "subscribe" : KafkaTopic.VALUE.value, + "subscribe" : KafkaTopic.VALUE.value, # topic should have atleast one message before spark session "startingOffsets" : 'latest', "failOnDataLoss" : 'false' # Optional: Set to "true" to fail the query on data loss } @@ -132,7 +132,7 @@ def SparkStreamer(key, kpi_list, oper_list, thresholds, stop_event, .writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \ - .option("topic", KafkaTopic.ANALYTICS_RESPONSE.value) \ + .option("topic", KafkaTopic.ALARMS.value) \ .option("checkpointLocation", "analytics/.spark/checkpoint") \ .outputMode("update") diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py index 9acd6ad9dffe4a5b10b107a6923ed85170ee141f..c3b78967efe13eef9a60e19e50e56bdfca4a410d 100644 --- a/src/analytics/backend/tests/messages.py +++ b/src/analytics/backend/tests/messages.py @@ -12,6 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import uuid +import json +from common.proto.kpi_manager_pb2 import KpiId +from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, + Analyzer ) def get_kpi_id_list(): return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"] @@ -32,3 +37,37 @@ def get_threshold_dict(): return { op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict } + + +def create_analyzer(): + _create_analyzer = Analyzer() + # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" + _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING + + _kpi_id = KpiId() + # input IDs to analyze + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + _create_analyzer.input_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + _create_analyzer.input_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.input_kpi_ids.append(_kpi_id) + # output IDs after analysis + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.output_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.output_kpi_ids.append(_kpi_id) + # parameter + _threshold_dict = { + # 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), + 'first_value' :(00, 10), 'last_value' :(40, 50), 'stdev_value':(00, 10)} + _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) + _create_analyzer.parameters['window_size'] = "60 seconds" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" + _create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size + _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet + + return _create_analyzer \ No newline at end of file diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index 2f40faba94ef7081db609116e8fd869e3d119a24..9221bb23ee041da06a4c1f401c75d1906f6748b0 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time +import time, json +from typing import Dict import logging import threading from common.tools.kafka.Variables import KafkaTopic from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict +from .messages import create_analyzer LOGGER = logging.getLogger(__name__) @@ -32,6 +34,24 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) +def test_StartSparkStreamer(): + LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") + analyzer_obj = create_analyzer() + analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid + analyzer_to_generate : Dict = { + "algo_name" : analyzer_obj.algorithm_name, + "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], + "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], + "oper_mode" : analyzer_obj.operation_mode, + "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), + "window_size" : analyzer_obj.parameters["window_size"], + "window_slider" : analyzer_obj.parameters["window_slider"], + # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] + } + AnalyticsBackendServiceObj = AnalyticsBackendService() + response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate) + assert isinstance(response, bool) + # def test_StartRequestListener(): # LOGGER.info('test_RunRequestListener') # AnalyticsBackendServiceObj = AnalyticsBackendService()