Newer
Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import json
import logging
from confluent_kafka import KafkaException, KafkaError
from common.tools.kafka.Variables import KafkaTopic
from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler
from analytics.backend.service.AnalyzerHelper import AnalyzerHelper
logger = logging.getLogger(__name__)
class DaskStreamer(threading.Thread):
def __init__(self, key, input_kpis, output_kpis, thresholds,
batch_size = 5,
window_size = None,
cluster_instance = None,
producer_instance = AnalyzerHelper.initialize_kafka_producer()
):
self.key = key
self.input_kpis = input_kpis
self.output_kpis = output_kpis
self.thresholds = thresholds
self.window_size = window_size # TODO: Not implemented
self.batch_size = batch_size
self.batch_duration = batch_duration
self.running = True
self.batch = []
# Initialize Kafka and Dask components
self.client = AnalyzerHelper.initialize_dask_client(cluster_instance)
self.consumer = AnalyzerHelper.initialize_kafka_consumer() # Single-threaded consumer
self.producer = producer_instance
logger.info("Dask Streamer initialized.")
def run(self):
"""Main method to start the DaskStreamer."""
try:
logger.info("Starting Dask Streamer")
last_batch_time = time.time()
while True:
if not self.consumer:
logger.warning("Kafka consumer is not initialized or stopped. Exiting loop.")
break
if not self.running:
logger.warning("Dask Streamer instance has been terminated. Exiting loop.")
break
if not self.client:
logger.warning("Dask client is not running. Exiting loop.")
message = self.consumer.poll(timeout=1.0)
if message is None:
# logger.info("No new messages received.")
continue
if message.error():
if message.error().code() == KafkaError._PARTITION_EOF:
logger.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}")
elif message.error():
raise KafkaException(message.error())
else:
try:
value = json.loads(message.value())
except json.JSONDecodeError:
logger.error(f"Failed to decode message: {message.value()}")
continue
self.batch.append(value)
# Window size has a precedence over batch size
if len(self.batch) >= self.batch_size: # If batch size is not provided, process continue with the default batch size
logger.info(f"Processing based on batch size {self.batch_size}.")
self.task_handler_selector()
self.batch = []
else:
# Process based on window size
current_time = time.time()
if (current_time - last_batch_time) >= self.batch_duration and self.batch:
logger.info(f"Processing based on window size {self.batch_duration}.")
self.task_handler_selector()
self.batch = []
last_batch_time = current_time
except Exception as e:
logger.exception(f"Error in Dask streaming process: {e}")
finally:
logger.info(">>> Exiting Dask Streamer...")
def task_handler_selector(self):
"""Select the task handler based on the task type."""
logger.info(f"Batch to be processed: {self.batch}")
if Handlers.is_valid_handler(self.thresholds["task_type"]):
if self.client is not None and self.client.status == 'running':
try:
future = self.client.submit(aggregation_handler, "batch size", self.key,
self.batch, self.input_kpis, self.output_kpis, self.thresholds)
future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value))
except Exception as e:
logger.error(f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}")
else:
logger.warning("Dask client is not running. Skipping processing.")
else:
logger.warning(f"Unknown task type: {self.thresholds['task_type']}. Skipping processing.")
def produce_result(self, result, destination_topic):
"""Produce results to the Kafka topic."""
if not result:
logger.warning("Nothing to produce. Skipping.")
return
for record in result:
try:
self.producer.produce(
destination_topic,
key=str(record.get('kpi_id', '')),
value=json.dumps(record),
callback=AnalyzerHelper.delivery_report
)
except KafkaException as e:
logger.error(f"Failed to produce message: {e}")
self.producer.flush()
logger.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.")
"""Clean up Kafka and Dask thread resources."""
if not self.running:
logger.info("Dask Streamer is already stopped.")
return
logger.info("Streamer running status is set to False. Waiting 5 seconds before stopping...")
time.sleep(5) # Waiting time for running tasks to complete
if self.consumer:
try:
self.consumer.close()
logger.info("Kafka consumer closed.")
except Exception as e:
logger.error(f"Error closing Kafka consumer: {e}")
if self.client is not None and hasattr(self.client, 'status') and self.client.status == 'running':
try:
self.client.close()
logger.info("Dask client closed.")
except Exception as e:
logger.error(f"Error closing Dask client: {e}")
# TODO: May be Single streamer for all analyzers ... ?