Commit a0986dcc authored by Konstantinos Poulakakis's avatar Konstantinos Poulakakis
Browse files

feat: Based on the threshold type select handler method.

parent 34ebe9f0
Loading
Loading
Loading
Loading
+9 −0
Original line number Diff line number Diff line
@@ -22,12 +22,21 @@ logger = logging.getLogger(__name__)

class Handlers(Enum):
    AGGREGATION_HANDLER = "AggregationHandler"
    AGGREGATION_HANDLER_THREE_TO_ONE = "AggregationHandlerThreeToOne"
    UNSUPPORTED_HANDLER = "UnsupportedHandler"

    @classmethod
    def is_valid_handler(cls, handler_name):
        return handler_name in cls._value2member_map_

    def select_handler(self, handler_name):
        if handler_name == self.AGGREGATION_HANDLER:
            return 'aggregation_handler'
        elif handler_name == self.AGGREGATION_HANDLER_THREE_TO_ONE:
            return 'aggregation_handler_three_to_one'
        else:
            return self.UNSUPPORTED_HANDLER

# This method is top-level and should not be part of the class due to serialization issues.
def threshold_handler(key, aggregated_df, thresholds):
    """
+8 −14
Original line number Diff line number Diff line
@@ -113,20 +113,14 @@ class DaskStreamer(threading.Thread):
        logger.info(f"Batch to be processed: {self.batch}")
        if Handlers.is_valid_handler(self.thresholds["task_type"]):
            if self.client is not None and self.client.status == 'running':
                if any('total' in d for d in self.thresholds.get('task_parameter', [])):
                try:
                        future = self.client.submit(aggregation_handler_three_to_one, "batch size", self.key,
                    future = self.client.submit(Handlers.select_handler(self.thresholds["task_type"]), "batch size",
                                                self.key,
                                                self.batch, self.input_kpis, self.output_kpis, self.thresholds)
                    future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value))
                except Exception as e:
                        logger.error(f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}")
                else:
                    try:
                        future = self.client.submit(aggregation_handler, "batch size", self.key,
                                                        self.batch, self.input_kpis, self.output_kpis, self.thresholds)
                        future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value))
                    except Exception as e:
                        logger.error(f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}")
                    logger.error(
                        f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}")
            else:
                logger.warning("Dask client is not running. Skipping processing.")
        else: