Commit 617b5666 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Minor changes in Analytics and KpiValueAPI

- "FAIL" is changed to "FALL" in Spark streamer.
- start() was missing in RequestListener call.
- updated test file in KpiValueAPI
parent 705e74ec
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -79,7 +79,7 @@ class AnalyticsBackendService(GenericGrpcService):
            return False

    def install_servicers(self):
        threading.Thread(target=self.RequestListener, args=())
        threading.Thread(target=self.RequestListener, args=()).start()

    def RequestListener(self):
        """
+2 −2
Original line number Diff line number Diff line
@@ -64,7 +64,7 @@ def ApplyThresholds(aggregated_df, thresholds):
    for col_name, (fail_th, raise_th) in thresholds.items():
        # Apply TH-Fail condition (if column value is less than the fail threshold)
        aggregated_df = aggregated_df.withColumn(
            f"{col_name}_THRESHOLD_FAIL", 
            f"{col_name}_THRESHOLD_FALL", 
            when(col(col_name) < fail_th, True).otherwise(False)
        )
        # Apply TH-RAISE condition (if column value is greater than the raise threshold)
@@ -128,7 +128,7 @@ def SparkStreamer(key, kpi_list, oper_list, thresholds, stop_event,

        # --- This will write output to Kafka: ACTUAL IMPLEMENTATION
        query = thresholded_stream_data \
            .selectExpr(f"'{key}' AS key", "to_json(struct(*)) AS value") \
            .selectExpr(f"CAST(kpi_id AS STRING) AS key", "to_json(struct(*)) AS value") \
            .writeStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \
+2 −2
Original line number Diff line number Diff line
@@ -21,8 +21,8 @@ from common.Settings import (
    ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
from kpi_value_api.service.KpiValueApiService import KpiValueApiService
from kpi_value_api.client.KpiValueApiClient import KpiValueApiClient
from kpi_value_api.tests.messages import create_kpi_value_list

from kpi_value_api.tests.messages import create_kpi_value_list, create_kpi_id_request
from common.proto.kpi_value_api_pb2 import KpiAlarms

LOCAL_HOST = '127.0.0.1'
KPIVALUEAPI_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI)  # type: ignore