diff --git a/scripts/run_tests_locally-analytics-backend.sh b/scripts/run_tests_locally-analytics-backend.sh index 99a789c9fda481942b904a2e805202642123835c..722779824a8c91b5d7cff0337c10c4ea1327a1b6 100755 --- a/scripts/run_tests_locally-analytics-backend.sh +++ b/scripts/run_tests_locally-analytics-backend.sh @@ -18,6 +18,7 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc +export KFK_SERVER_ADDRESS='127.0.0.1:9092' CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ diff --git a/scripts/run_tests_locally-analytics-frontend.sh b/scripts/run_tests_locally-analytics-frontend.sh index e30d30da623b2d0eee3d925d69a846b4b1f516a3..e74eb4ec198d77688f62004931c69eac31e60f0c 100755 --- a/scripts/run_tests_locally-analytics-frontend.sh +++ b/scripts/run_tests_locally-analytics-frontend.sh @@ -18,6 +18,7 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc +export KFK_SERVER_ADDRESS='127.0.0.1:9092' CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ diff --git a/scripts/run_tests_locally-kpi-value-API.sh b/scripts/run_tests_locally-kpi-value-API.sh index 3953d2a89c6fbe2bd3546e648246b9b018e5fdb0..96ac558bad5f0bf6bc6f5ee90a26cd11fda69273 100755 --- a/scripts/run_tests_locally-kpi-value-API.sh +++ b/scripts/run_tests_locally-kpi-value-API.sh @@ -19,8 +19,9 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc -KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}") -KFK_SERVER_ADDRESS=${KAFKA_IP}:9092 +# KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}") +# export KFK_SERVER_ADDRESS=${KAFKA_IP}:9092 +export KFK_SERVER_ADDRESS='127.0.0.1:9092' # helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0 python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG -o log_cli=true --verbose \ kpi_value_api/tests/test_kpi_value_api.py diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 367d91d97f5c3676d0e17dbfaa80f40abc09e9c1..f9c3b86a2eafe8a82691fa992027b026a1e9aa18 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -61,7 +61,7 @@ class AnalyticsBackendService(GenericGrpcService): else: LOGGER.error("Consumer error: {:}".format(receive_msg.error())) print ("Consumer error: {:}".format(receive_msg.error())) - continue + break try: analyzer = json.loads(receive_msg.value().decode('utf-8')) analyzer_uuid = receive_msg.key().decode('utf-8') diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index 0e2d49300ccf1cffed7f09435136055b8c70615e..706e180d5cf65107f5899315cdf75867beb81608 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -46,7 +46,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): self.scheduler = BackgroundScheduler() self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : 'analytics-frontend', + 'group.id' : 'kpi-value-api-frontend', 'auto.offset.reset' : 'latest'}) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @@ -152,9 +152,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): for alarm_key, value in self.StartResponseListener(request.kpi_id.uuid): response.start_timestamp.timestamp = datetime.strptime( - value["window"]["start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() - response.end_timestamp.timestamp = datetime.strptime( - value["window"]["end"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() + value["window_start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() response.kpi_id.kpi_id.uuid = value['kpi_id'] for key, threshold in value.items(): if key not in ['kpi_id', 'window']: @@ -183,10 +181,10 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): key, value = self.result_queue.get() # Wait until a result is available LOGGER.info("In while true ...") yield key, value # Yield the result to the calling function - except KeyboardInterrupt: - LOGGER.warning("Listener stopped manually.") + except Exception as e: + LOGGER.warning("Listener stopped. Error: {:}".format(e)) finally: - self.StopListener() + self.scheduler.shutdown() def response_listener(self, filter_key=None): """ @@ -196,23 +194,24 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): consumer = self.kafka_consumer consumer.subscribe([self.listener_topic]) - msg = consumer.poll(2.0) - if msg is None: - return - elif msg.error(): - if msg.error().code() != KafkaError._PARTITION_EOF: - LOGGER.error(f"Kafka error: {msg.error()}") - return - try: - key = msg.key().decode('utf-8') if msg.key() else None - if filter_key is not None and key == filter_key: - value = json.loads(msg.value().decode('utf-8')) - LOGGER.info(f"Received key: {key}, value: {value}") - self.result_queue.put((key, value)) - else: - LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}") - except Exception as e: - LOGGER.error(f"Error processing Kafka message: {e}") + while True: + msg = consumer.poll(1.0) + if msg is None: + continue + elif msg.error(): + if msg.error().code() != KafkaError._PARTITION_EOF: + LOGGER.error(f"Kafka error: {msg.error()}") + break + try: + key = msg.key().decode('utf-8') if msg.key() else None + if filter_key is not None and key == filter_key: + value = json.loads(msg.value().decode('utf-8')) + LOGGER.info(f"Received key: {key}, value: {value}") + self.result_queue.put((key, value)) + else: + LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}") + except Exception as e: + LOGGER.error(f"Error processing Kafka message: {e}") def delivery_callback(self, err, msg): if err: LOGGER.debug('Message delivery failed: {:}'.format(err))