Commit 5bb242b3 authored by Konstantinos Poulakakis's avatar Konstantinos Poulakakis
Browse files

feat: add select handler function

parent 2b5ebb93
Loading
Loading
Loading
Loading
+7 −7
Original line number Diff line number Diff line
@@ -29,13 +29,13 @@ class Handlers(Enum):
    def is_valid_handler(cls, handler_name):
        return handler_name in cls._value2member_map_

    def select_handler(self, handler_name):
        if handler_name == self.AGGREGATION_HANDLER:
            return 'aggregation_handler'
        elif handler_name == self.AGGREGATION_HANDLER_THREE_TO_ONE:
            return 'aggregation_handler_three_to_one'
def select_handler(handler_name):
    if handler_name == "AggregationHandler":
        return aggregation_handler
    elif handler_name == "AggregationHandlerThreeToOne":
        return aggregation_handler_three_to_one
    else:
            return self.UNSUPPORTED_HANDLER
        return "UnsupportedHandler"

# This method is top-level and should not be part of the class due to serialization issues.
def threshold_handler(key, aggregated_df, thresholds):
+2 −2
Original line number Diff line number Diff line
@@ -19,7 +19,7 @@ import logging

from confluent_kafka                            import KafkaException, KafkaError
from common.tools.kafka.Variables               import KafkaTopic
from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler, aggregation_handler_three_to_one
from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler, aggregation_handler_three_to_one , select_handler
from analytics.backend.service.AnalyzerHelper   import AnalyzerHelper


@@ -114,7 +114,7 @@ class DaskStreamer(threading.Thread):
        if Handlers.is_valid_handler(self.thresholds["task_type"]):
            if self.client is not None and self.client.status == 'running':
                try:
                    future = self.client.submit(Handlers.select_handler(self.thresholds["task_type"]), "batch size",
                    future = self.client.submit(select_handler(self.thresholds["task_type"]), "batch size",
                                                self.key,
                                                self.batch, self.input_kpis, self.output_kpis, self.thresholds)
                    future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value))