From 617b566658197b5f6647c3069778eeec75526dd2 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Fri, 20 Sep 2024 10:44:01 +0000 Subject: [PATCH] Minor changes in Analytics and KpiValueAPI - "FAIL" is changed to "FALL" in Spark streamer. - start() was missing in RequestListener call. - updated test file in KpiValueAPI --- src/analytics/backend/service/AnalyticsBackendService.py | 2 +- src/analytics/backend/service/SparkStreaming.py | 4 ++-- src/kpi_value_api/tests/test_kpi_value_api.py | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 4af90cf17..658d23795 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -79,7 +79,7 @@ class AnalyticsBackendService(GenericGrpcService): return False def install_servicers(self): - threading.Thread(target=self.RequestListener, args=()) + threading.Thread(target=self.RequestListener, args=()).start() def RequestListener(self): """ diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py index cebfeb829..f204c6247 100644 --- a/src/analytics/backend/service/SparkStreaming.py +++ b/src/analytics/backend/service/SparkStreaming.py @@ -64,7 +64,7 @@ def ApplyThresholds(aggregated_df, thresholds): for col_name, (fail_th, raise_th) in thresholds.items(): # Apply TH-Fail condition (if column value is less than the fail threshold) aggregated_df = aggregated_df.withColumn( - f"{col_name}_THRESHOLD_FAIL", + f"{col_name}_THRESHOLD_FALL", when(col(col_name) < fail_th, True).otherwise(False) ) # Apply TH-RAISE condition (if column value is greater than the raise threshold) @@ -128,7 +128,7 @@ def SparkStreamer(key, kpi_list, oper_list, thresholds, stop_event, # --- This will write output to Kafka: ACTUAL IMPLEMENTATION query = thresholded_stream_data \ - .selectExpr(f"'{key}' AS key", "to_json(struct(*)) AS value") \ + .selectExpr(f"CAST(kpi_id AS STRING) AS key", "to_json(struct(*)) AS value") \ .writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \ diff --git a/src/kpi_value_api/tests/test_kpi_value_api.py b/src/kpi_value_api/tests/test_kpi_value_api.py index 307b5cdad..ea6b22585 100644 --- a/src/kpi_value_api/tests/test_kpi_value_api.py +++ b/src/kpi_value_api/tests/test_kpi_value_api.py @@ -21,8 +21,8 @@ from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc) from kpi_value_api.service.KpiValueApiService import KpiValueApiService from kpi_value_api.client.KpiValueApiClient import KpiValueApiClient -from kpi_value_api.tests.messages import create_kpi_value_list - +from kpi_value_api.tests.messages import create_kpi_value_list, create_kpi_id_request +from common.proto.kpi_value_api_pb2 import KpiAlarms LOCAL_HOST = '127.0.0.1' KPIVALUEAPI_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI) # type: ignore -- GitLab