Commit eaffff44 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Updated Analytics Backend with Streamer

- Refactor DaskStreamer by replacing "windows_size" with "batch_duration".
- Refactor aggregation_handler to support multiple KPI dataframe
parent e33ba8b8
Loading
Loading
Loading
Loading
+7 −6
Original line number Diff line number Diff line
@@ -107,7 +107,8 @@ class AnalyticsBackendService(GenericGrpcService):
                input_kpis        = analyzer['input_kpis'        ],
                output_kpis       = analyzer['output_kpis'       ],
                thresholds        = analyzer['thresholds'        ],
                batch_size        = analyzer['batch_size' ],
                batch_size        = analyzer['batch_size_min'    ],
                batch_duration    = analyzer['batch_duration_min'],
                window_size       = analyzer['window_size'       ],
                cluster_instance  = self.cluster,
                producer_instance = self.central_producer,
@@ -119,7 +120,7 @@ class AnalyticsBackendService(GenericGrpcService):
            if analyzer['duration'] > 0:
                def stop_after_duration():
                    time.sleep(analyzer['duration'])
                    LOGGER.warning(f"Execution duration completed of Analyzer: {analyzer_uuid}")
                    LOGGER.warning(f"Execution duration ({analyzer['duration']}) completed of Analyzer: {analyzer_uuid}")
                    if not self.StopStreamer(analyzer_uuid):
                        LOGGER.warning("Failed to stop Dask Streamer. Streamer may be already terminated.")

+8 −3
Original line number Diff line number Diff line
@@ -95,6 +95,8 @@ def aggregation_handler(
            "sum"     : ('kpi_value', 'sum'),
        }

        results = []
        
        # Process each KPI-specific task parameter
        for kpi_index, kpi_id in enumerate(input_kpi_list):

@@ -122,10 +124,13 @@ def aggregation_handler(
                agg_df['kpi_id'] = output_kpi_list[kpi_index]

                # logger.info(f"4. Applying thresholds for df: {agg_df['kpi_id']}")
                result = threshold_handler(key, agg_df, kpi_task_parameters)
                record = threshold_handler(key, agg_df, kpi_task_parameters)

                return result.to_dict(orient='records')
                results.extend(record.to_dict(orient='records'))
            else:
                logger.warning(f"No data available for KPIs: {kpi_id}. Skipping aggregation.")
                continue
        if results:
            return results
        else:
            return []
+14 −12
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@ logger = logging.getLogger(__name__)
class DaskStreamer(threading.Thread):
    def __init__(self, key, input_kpis, output_kpis, thresholds, 
                 batch_size        = 5, 
                 batch_duration    = None,
                 window_size       = None,
                 cluster_instance  = None,
                 producer_instance = AnalyzerHelper.initialize_kafka_producer()
@@ -38,8 +39,9 @@ class DaskStreamer(threading.Thread):
        self.input_kpis     = input_kpis
        self.output_kpis    = output_kpis
        self.thresholds     = thresholds
        self.window_size = window_size
        self.window_size    = window_size      # TODO: Not implemented
        self.batch_size     = batch_size
        self.batch_duration = batch_duration
        self.running        = True
        self.batch          = []

@@ -65,7 +67,7 @@ class DaskStreamer(threading.Thread):
                if not self.client:
                    logger.warning("Dask client is not running. Exiting loop.")
                    break
                message = self.consumer.poll(timeout=2.0)
                message = self.consumer.poll(timeout=1.0)
                if message is None:
                    # logger.info("No new messages received.")
                    continue
@@ -83,7 +85,7 @@ class DaskStreamer(threading.Thread):
                    self.batch.append(value)

                # Window size has a precedence over batch size
                if self.window_size is None:
                if self.batch_duration is None:
                    if len(self.batch) >= self.batch_size:  # If batch size is not provided, process continue with the default batch size
                        logger.info(f"Processing based on batch size {self.batch_size}.")
                        self.task_handler_selector()
@@ -91,8 +93,8 @@ class DaskStreamer(threading.Thread):
                else:
                    # Process based on window size
                    current_time = time.time()
                    if (current_time - last_batch_time) >= self.window_size and self.batch:
                        logger.info(f"Processing based on window size {self.window_size}.")
                    if (current_time - last_batch_time) >= self.batch_duration and self.batch:
                        logger.info(f"Processing based on window size {self.batch_duration}.")
                        self.task_handler_selector()
                        self.batch = []
                        last_batch_time = current_time
+2 −2
Original line number Diff line number Diff line
@@ -32,13 +32,13 @@ def get_thresholds():
    }

def get_duration():
    return 40
    return 90

def get_windows_size():
    return None

def get_batch_size():
    return 10
    return 5

def get_interval():
    return 5