Skip to content
Snippets Groups Projects
Streamer.py 6.98 KiB
Newer Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import time
import json
from confluent_kafka import KafkaException, KafkaError
# import pandas as pd
from common.tools.kafka.Variables import KafkaTopic
from .AnalyzerHandlers import AnalyzerHandlers, aggregation_handler
from .AnalyzerHelper import AnalyzerHelper


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s -  %(levelname)s - %(message)s')


class DaskStreamer:
    def __init__(self, key, input_kpis, output_kpis, thresholds, batch_size=5, 
                 window_size=None, n_workers=5, threads_per_worker=2):
        self.key         = key
        self.input_kpis  = input_kpis
        self.output_kpis = output_kpis
        self.thresholds  = thresholds
        self.window_size = window_size
        self.batch_size  = batch_size
        self.n_workers   = n_workers
        self.threads_per_worker = threads_per_worker
        self.running     = True
        self.batch       = []

        # Initialize Kafka and Dask components
        self.client, self.cluster = AnalyzerHelper.initialize_dask_client(n_workers, threads_per_worker)
        self.consumer = AnalyzerHelper.initialize_kafka_consumer()
        self.producer = AnalyzerHelper.initialize_kafka_producer()
        logger.info("Dask Streamer initialized.")

    def run(self):
        """Main method to start the DaskStreamer."""
        try:
            logger.info("Starting Dask Streamer")
            last_batch_time = time.time()
            while True:
                if not self.consumer:
                    logger.warning("Kafka consumer is not initialized or stopped. Exiting loop.")
                    break
                if not self.running:
                    logger.warning("Dask Streamer is not running. Exiting loop.")
                    break
                message = self.consumer.poll(timeout=2.0)
                if message is None:
                    # logger.info("No new messages received.")
                    continue
                if message.error():
                    if message.error().code() == KafkaError._PARTITION_EOF:
                        logger.warning(f"Consumer reached end of topic {message.topic()}/{message.partition()}")
                    elif message.error():
                        raise KafkaException(message.error())
                else:
                    try:
                        value = json.loads(message.value())
                    except json.JSONDecodeError:
                        logger.error(f"Failed to decode message: {message.value()}")
                        continue
                    self.batch.append(value)
                    # logger.info(f"Received message: {value}")

                # Window size has a priority over batch size 
                if self.window_size is None:
                    if len(self.batch) >= self.batch_size:  # If batch size is not provided, process continue with default batch size
                        logger.info(f"Processing based on batch size {self.batch_size}.")
                        self.task_handler_selector()
                        self.batch = []
                else:
                    # Process based on window size
                    current_time = time.time()
                    if (current_time - last_batch_time) >= self.window_size and self.batch:
                        logger.info(f"Processing based on window size {self.window_size}.")
                        self.task_handler_selector()
                        self.batch = []
                        last_batch_time = current_time

        except Exception as e:
            logger.exception(f"Error in Dask streaming process: {e}")
        finally:
            logger.info(">>> Exiting Dask Streamer...")
            self.cleanup()
            logger.info(">>> Dask Streamer Cleanup Completed.")

    def task_handler_selector(self):
        """Select the task handler based on the task type."""
        if AnalyzerHandlers.is_valid_handler(self.thresholds["task_type"]):
            if self.client.status == 'running':
                future = self.client.submit(aggregation_handler, "batch size", self.key,
                                                self.batch, self.input_kpis, self.output_kpis, self.thresholds)
                future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value))
            else:
                logger.warning("Dask client is not running. Skipping processing.")
        else:
            logger.warning(f"Unknown task type: {self.thresholds['task_type']}. Skipping processing.")

    def produce_result(self, result, destination_topic):
        """Produce results to the Kafka topic."""
        for record in result:
            try:
                self.producer.produce(
                    destination_topic,
                    key=str(record.get('kpi_id', '')),
                    value=json.dumps(record),
                    callback=AnalyzerHelper.delivery_report
                )
            except KafkaException as e:
                logger.error(f"Failed to produce message: {e}")
        self.producer.flush()
        logger.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.")

    def cleanup(self):
        """Clean up Kafka and Dask resources."""
        logger.info("Shutting down resources...")
        self.running = False
        if self.consumer:
            try:
                self.consumer.close()
                logger.info("Kafka consumer closed.")
            except Exception as e:
                logger.error(f"Error closing Kafka consumer: {e}")

        if self.producer:
            try:
                self.producer.flush()
                logger.info("Kafka producer flushed and closed.")
            except Exception as e:
                logger.error(f"Error closing Kafka producer: {e}")

        if self.client and hasattr(self.client, 'status') and self.client.status == 'running':
            try:
                self.client.close()
                logger.info("Dask client closed.")
            except Exception as e:
                logger.error(f"Error closing Dask client: {e}")

        if self.cluster and hasattr(self.cluster, 'close'):
            try:
                self.cluster.close(timeout=5)
                logger.info("Dask cluster closed.")
            except Exception as e:
                logger.error(f"May be timeout. Error closing Dask cluster: {e}")