diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 4af90cf17cf225ae77e5c31a6e39fa4d8ec3e99a..658d237956b4ed3addbbc295ef0d19dd4b977257 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -79,7 +79,7 @@ class AnalyticsBackendService(GenericGrpcService): return False def install_servicers(self): - threading.Thread(target=self.RequestListener, args=()) + threading.Thread(target=self.RequestListener, args=()).start() def RequestListener(self): """ diff --git a/src/analytics/backend/service/SparkStreaming.py b/src/analytics/backend/service/SparkStreaming.py index cebfeb829937ed0c88de33ccc2f949d7f72d7f91..f204c6247436177cd032c777c048ecb165051ec2 100644 --- a/src/analytics/backend/service/SparkStreaming.py +++ b/src/analytics/backend/service/SparkStreaming.py @@ -64,7 +64,7 @@ def ApplyThresholds(aggregated_df, thresholds): for col_name, (fail_th, raise_th) in thresholds.items(): # Apply TH-Fail condition (if column value is less than the fail threshold) aggregated_df = aggregated_df.withColumn( - f"{col_name}_THRESHOLD_FAIL", + f"{col_name}_THRESHOLD_FALL", when(col(col_name) < fail_th, True).otherwise(False) ) # Apply TH-RAISE condition (if column value is greater than the raise threshold) @@ -128,7 +128,7 @@ def SparkStreamer(key, kpi_list, oper_list, thresholds, stop_event, # --- This will write output to Kafka: ACTUAL IMPLEMENTATION query = thresholded_stream_data \ - .selectExpr(f"'{key}' AS key", "to_json(struct(*)) AS value") \ + .selectExpr(f"CAST(kpi_id AS STRING) AS key", "to_json(struct(*)) AS value") \ .writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", KafkaConfig.get_kafka_address()) \ diff --git a/src/kpi_value_api/tests/test_kpi_value_api.py b/src/kpi_value_api/tests/test_kpi_value_api.py index 307b5cdad4e6503a774e308f669fc44762f84bf1..ea6b22585a68d565c8162f1629a6a1ac4a4f6d6a 100644 --- a/src/kpi_value_api/tests/test_kpi_value_api.py +++ b/src/kpi_value_api/tests/test_kpi_value_api.py @@ -21,8 +21,8 @@ from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc) from kpi_value_api.service.KpiValueApiService import KpiValueApiService from kpi_value_api.client.KpiValueApiClient import KpiValueApiClient -from kpi_value_api.tests.messages import create_kpi_value_list - +from kpi_value_api.tests.messages import create_kpi_value_list, create_kpi_id_request +from common.proto.kpi_value_api_pb2 import KpiAlarms LOCAL_HOST = '127.0.0.1' KPIVALUEAPI_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI) # type: ignore