diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 84f1887c628a5bbe9968c4d89ddf912f47e96ed6..5331d027dfeb9c3608ef7da0693758a6c925eb48 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -12,18 +12,52 @@ # See the License for the specific language governing permissions and # limitations under the License. -import threading +import json +import logging +import threading from common.tools.service.GenericGrpcService import GenericGrpcService from analytics.backend.service.SparkStreaming import SparkStreamer +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic +from confluent_kafka import Consumer as KafkaConsumer +from confluent_kafka import KafkaError +LOGGER = logging.getLogger(__name__) class AnalyticsBackendService(GenericGrpcService): """ Class listens for ... """ def __init__(self, cls_name : str = __name__) -> None: - pass + self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : 'analytics-frontend', + 'auto.offset.reset' : 'latest'}) + + def RunSparkStreamer(self, kpi_list): + threading.Thread(target=SparkStreamer, args=(kpi_list,)).start() + + def RunRequestListener(self)->bool: + threading.Thread(target=self.RequestListener).start() + return True - def RunSparkStreamer(self): - threading.Thread(target=SparkStreamer).start() + def RequestListener(self): + """ + listener for requests on Kafka topic. + """ + consumer = self.kafka_consumer + consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value]) + while True: + receive_msg = consumer.poll(2.0) + if receive_msg is None: + continue + elif receive_msg.error(): + if receive_msg.error().code() == KafkaError._PARTITION_EOF: + continue + else: + print("Consumer error: {}".format(receive_msg.error())) + break + analyzer = json.loads(receive_msg.value().decode('utf-8')) + analyzer_id = receive_msg.key().decode('utf-8') + LOGGER.debug('Recevied Collector: {:} - {:}'.format(analyzer_id, analyzer)) + print('Recevied Collector: {:} - {:} - {:}'.format(analyzer_id, analyzer, analyzer['input_kpis'])) + self.RunSparkStreamer(analyzer['input_kpis']) # TODO: Add active analyzer to list diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py index 92cc9a8429c1c1619b9469d0fba698857218c593..f42618a1ca5f9e52290f12a1f1eacaec480b293b 100644 --- a/src/analytics/backend/service/SparkStreaming.py +++ b/src/analytics/backend/service/SparkStreaming.py @@ -15,7 +15,7 @@ import logging from pyspark.sql import SparkSession -from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType +from pyspark.sql.types import StructType, StructField, StringType, DoubleType from pyspark.sql.functions import from_json, col from common.tools.kafka.Variables import KafkaConfig, KafkaTopic @@ -32,20 +32,19 @@ def SettingKafkaParameters(): # TODO: create get_kafka_consumer() in common w return { # "kafka.bootstrap.servers": '127.0.0.1:9092', "kafka.bootstrap.servers": KafkaConfig.get_kafka_address(), - "subscribe" : KafkaTopic.ANALYTICS_REQUEST.value, - "startingOffsets" : 'latest', + "subscribe" : KafkaTopic.VALUE.value, + "startingOffsets" : 'latest', "failOnDataLoss" : 'false' # Optional: Set to "true" to fail the query on data loss } def DefiningRequestSchema(): return StructType([ - StructField("algo_name", StringType() , True), - StructField("input_kpis", ArrayType(StringType()), True), - StructField("output_kpis", ArrayType(StringType()), True), - StructField("oper_mode", IntegerType() , True) + StructField("time_stamp" , StringType() , True), + StructField("kpi_id" , StringType() , True), + StructField("kpi_value" , DoubleType() , True) ]) -def SparkStreamer(): +def SparkStreamer(kpi_list): """ Method to perform Spark operation Kafka stream. NOTE: Kafka topic to be processesd should have atleast one row before initiating the spark session. @@ -68,9 +67,11 @@ def SparkStreamer(): parsed_stream_data = stream_data.withColumn("parsed_value", from_json(col("value"), schema)) # Select the parsed fields final_stream_data = parsed_stream_data.select("parsed_value.*") + # Filter the stream to only include rows where the kpi_id is in the kpi_list + filtered_stream_data = final_stream_data.filter(col("kpi_id").isin(kpi_list)) # Start the Spark streaming query - query = final_stream_data \ + query = filtered_stream_data \ .writeStream \ .outputMode("append") \ .format("console") # You can change this to other output modes or sinks diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index 7a6175ecf0f6b14663226c9d95b913754e010e0f..426c89e54e37da0d8373818fa9220e9f231ebe63 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -30,8 +30,15 @@ LOGGER = logging.getLogger(__name__) # response = KafkaTopic.create_all_topics() # assert isinstance(response, bool) -def test_SparkListener(): +def test_RunRequestListener(): LOGGER.info('test_RunRequestListener') AnalyticsBackendServiceObj = AnalyticsBackendService() - response = AnalyticsBackendServiceObj.RunSparkStreamer() + response = AnalyticsBackendServiceObj.RunRequestListener() LOGGER.debug(str(response)) + + +# def test_SparkListener(): +# LOGGER.info('test_RunRequestListener') +# AnalyticsBackendServiceObj = AnalyticsBackendService() +# response = AnalyticsBackendServiceObj.RunSparkStreamer() +# LOGGER.debug(str(response)) diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index ccbef3599d13d703d225aaf7e7b60a3419bcea47..0f9f4e14657c4252744dce0f67c27ef8b1ff4e87 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -70,7 +70,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): "oper_mode" : analyzer_obj.operation_mode } self.kafka_producer.produce( - KafkaTopic.VALUE.value, + KafkaTopic.ANALYTICS_REQUEST.value, key = analyzer_uuid, value = json.dumps(analyzer_to_generate), callback = self.delivery_callback @@ -107,7 +107,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): "oper_mode" : -1 } self.kafka_producer.produce( - KafkaTopic.VALUE.value, + KafkaTopic.ANALYTICS_REQUEST.value, key = analyzer_uuid, value = json.dumps(analyzer_to_stop), callback = self.delivery_callback diff --git a/src/analytics/frontend/tests/messages.py b/src/analytics/frontend/tests/messages.py index 4c826e5c336d05e57e183c538ca50bbcdeef164e..0a8300436c55b10120bba3c2c0b4c67f8ae03a7a 100644 --- a/src/analytics/frontend/tests/messages.py +++ b/src/analytics/frontend/tests/messages.py @@ -32,6 +32,10 @@ def create_analyzer(): _kpi_id = KpiId() # input IDs to analyze _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + _create_analyzer.input_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" _create_analyzer.input_kpi_ids.append(_kpi_id) _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.input_kpi_ids.append(_kpi_id) diff --git a/src/telemetry/backend/tests/test_TelemetryBackend.py b/src/telemetry/backend/tests/test_TelemetryBackend.py index a2bbee540c3ce348ef52eceb0e776f48a68d94b1..24a1b35cc2244c54fdaa0042bad6c4d397d92af8 100644 --- a/src/telemetry/backend/tests/test_TelemetryBackend.py +++ b/src/telemetry/backend/tests/test_TelemetryBackend.py @@ -25,10 +25,10 @@ LOGGER = logging.getLogger(__name__) ########################### # --- "test_validate_kafka_topics" should be run before the functionality tests --- -def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) +# def test_validate_kafka_topics(): +# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") +# response = KafkaTopic.create_all_topics() +# assert isinstance(response, bool) def test_RunRequestListener(): LOGGER.info('test_RunRequestListener')