Commit 07334879 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

feat: Analytics Changes for Pluggable Use Case

- Update Kafka consumer initialization to accept dynamic group_id
- Enhance processing logic
parent f3bbb2dd
Loading
Loading
Loading
Loading
+4 −3
Original line number Diff line number Diff line
@@ -43,11 +43,12 @@ class AnalyzerHelper:
        return cluster

    @staticmethod
    def initialize_kafka_consumer():    # TODO: update to receive topic and group_id as parameters
        """Initialize the Kafka consumer."""
    def initialize_kafka_consumer(group_id='analytics-backend'):
        """Initialize the Kafka consumer with a unique group_id so each streamer
        receives its own copy of all messages from the topic."""
        consumer_conf = {
            'bootstrap.servers': KafkaConfig.get_kafka_address(),
            'group.id': 'analytics-backend',
            'group.id': group_id,
            'auto.offset.reset': 'latest'
        }
        consumer = Consumer(consumer_conf)
+9 −4
Original line number Diff line number Diff line
@@ -54,7 +54,7 @@ class DaskStreamer(threading.Thread):

        # Initialize Kafka and Dask components
        self.client   = AnalyzerHelper.initialize_dask_client(cluster_instance)
        self.consumer = AnalyzerHelper.initialize_kafka_consumer()      # Single-threaded consumer
        self.consumer = AnalyzerHelper.initialize_kafka_consumer(group_id=f'analytics-backend-{self.key}')
        self.producer = producer_instance

        logger.info("Dask Streamer initialized.")
@@ -106,10 +106,15 @@ class DaskStreamer(threading.Thread):
                        self.task_handler_selector()
                        self.batch = []
                else:
                    # Process based on window size
                    # Process based on window size OR batch size, whichever triggers first
                    current_time = time.time()
                    if (current_time - last_batch_time) >= self.batch_duration and self.batch:
                    time_elapsed = (current_time - last_batch_time) >= self.batch_duration
                    batch_full = len(self.batch) >= self.batch_size
                    if (time_elapsed or batch_full) and self.batch:
                        if time_elapsed:
                            logger.info(f"Processing based on window size {self.batch_duration}.")
                        else:
                            logger.info(f"Processing based on batch size {self.batch_size} (before window size {self.batch_duration} elapsed).")
                        self.task_handler_selector()
                        self.batch = []
                        last_batch_time = current_time