Skip to content
Snippets Groups Projects
Commit 067ace16 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes in Telemetry

- Added Kafka Request Listener to listen for requests from the Analytics Frontend.
- Updated `SparkStreamer.py` to consume and process streams from VALUE topics, and further filter rows based on KPIs in the `input_kpis` list.
- Updated the frontend TOPIC from `VALUE` to `ANALYTICS_REQUEST`.
- Updated messages to keep the KPI UUID consistent with the one generated by the Telemetry Backend service.
parent c8ad3a5c
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!261(CTTC) New Analytics Component
......@@ -12,18 +12,52 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
import json
import logging
import threading
from common.tools.service.GenericGrpcService import GenericGrpcService
from analytics.backend.service.SparkStreaming import SparkStreamer
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaError
LOGGER = logging.getLogger(__name__)
class AnalyticsBackendService(GenericGrpcService):
"""
Class listens for ...
"""
def __init__(self, cls_name : str = __name__) -> None:
pass
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'})
def RunSparkStreamer(self, kpi_list):
threading.Thread(target=SparkStreamer, args=(kpi_list,)).start()
def RunRequestListener(self)->bool:
threading.Thread(target=self.RequestListener).start()
return True
def RunSparkStreamer(self):
threading.Thread(target=SparkStreamer).start()
def RequestListener(self):
"""
listener for requests on Kafka topic.
"""
consumer = self.kafka_consumer
consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value])
while True:
receive_msg = consumer.poll(2.0)
if receive_msg is None:
continue
elif receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(receive_msg.error()))
break
analyzer = json.loads(receive_msg.value().decode('utf-8'))
analyzer_id = receive_msg.key().decode('utf-8')
LOGGER.debug('Recevied Collector: {:} - {:}'.format(analyzer_id, analyzer))
print('Recevied Collector: {:} - {:} - {:}'.format(analyzer_id, analyzer, analyzer['input_kpis']))
self.RunSparkStreamer(analyzer['input_kpis']) # TODO: Add active analyzer to list
......@@ -15,7 +15,7 @@
import logging
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import from_json, col
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
......@@ -32,20 +32,19 @@ def SettingKafkaParameters(): # TODO: create get_kafka_consumer() in common w
return {
# "kafka.bootstrap.servers": '127.0.0.1:9092',
"kafka.bootstrap.servers": KafkaConfig.get_kafka_address(),
"subscribe" : KafkaTopic.ANALYTICS_REQUEST.value,
"startingOffsets" : 'latest',
"subscribe" : KafkaTopic.VALUE.value,
"startingOffsets" : 'latest',
"failOnDataLoss" : 'false' # Optional: Set to "true" to fail the query on data loss
}
def DefiningRequestSchema():
return StructType([
StructField("algo_name", StringType() , True),
StructField("input_kpis", ArrayType(StringType()), True),
StructField("output_kpis", ArrayType(StringType()), True),
StructField("oper_mode", IntegerType() , True)
StructField("time_stamp" , StringType() , True),
StructField("kpi_id" , StringType() , True),
StructField("kpi_value" , DoubleType() , True)
])
def SparkStreamer():
def SparkStreamer(kpi_list):
"""
Method to perform Spark operation Kafka stream.
NOTE: Kafka topic to be processesd should have atleast one row before initiating the spark session.
......@@ -68,9 +67,11 @@ def SparkStreamer():
parsed_stream_data = stream_data.withColumn("parsed_value", from_json(col("value"), schema))
# Select the parsed fields
final_stream_data = parsed_stream_data.select("parsed_value.*")
# Filter the stream to only include rows where the kpi_id is in the kpi_list
filtered_stream_data = final_stream_data.filter(col("kpi_id").isin(kpi_list))
# Start the Spark streaming query
query = final_stream_data \
query = filtered_stream_data \
.writeStream \
.outputMode("append") \
.format("console") # You can change this to other output modes or sinks
......
......@@ -30,8 +30,15 @@ LOGGER = logging.getLogger(__name__)
# response = KafkaTopic.create_all_topics()
# assert isinstance(response, bool)
def test_SparkListener():
def test_RunRequestListener():
LOGGER.info('test_RunRequestListener')
AnalyticsBackendServiceObj = AnalyticsBackendService()
response = AnalyticsBackendServiceObj.RunSparkStreamer()
response = AnalyticsBackendServiceObj.RunRequestListener()
LOGGER.debug(str(response))
# def test_SparkListener():
# LOGGER.info('test_RunRequestListener')
# AnalyticsBackendServiceObj = AnalyticsBackendService()
# response = AnalyticsBackendServiceObj.RunSparkStreamer()
# LOGGER.debug(str(response))
......@@ -70,7 +70,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
"oper_mode" : analyzer_obj.operation_mode
}
self.kafka_producer.produce(
KafkaTopic.VALUE.value,
KafkaTopic.ANALYTICS_REQUEST.value,
key = analyzer_uuid,
value = json.dumps(analyzer_to_generate),
callback = self.delivery_callback
......@@ -107,7 +107,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
"oper_mode" : -1
}
self.kafka_producer.produce(
KafkaTopic.VALUE.value,
KafkaTopic.ANALYTICS_REQUEST.value,
key = analyzer_uuid,
value = json.dumps(analyzer_to_stop),
callback = self.delivery_callback
......
......@@ -32,6 +32,10 @@ def create_analyzer():
_kpi_id = KpiId()
# input IDs to analyze
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
_create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
_create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.input_kpi_ids.append(_kpi_id)
......
......@@ -25,10 +25,10 @@ LOGGER = logging.getLogger(__name__)
###########################
# --- "test_validate_kafka_topics" should be run before the functionality tests ---
def test_validate_kafka_topics():
LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
# def test_validate_kafka_topics():
# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
# response = KafkaTopic.create_all_topics()
# assert isinstance(response, bool)
def test_RunRequestListener():
LOGGER.info('test_RunRequestListener')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment