From 6b136318cfd1395647747ed051342cf7eca299be Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Sun, 22 Sep 2024 06:54:15 +0000 Subject: [PATCH 01/12] Changes in New Monitoring: - Added scripts to test and display logs. - Added `->` in the `docker run --name kafka` line in the `.gitlab.cl` file for `kpi_value_api`, `kpi_value_writer`, and `telemetry`. - Removed Java from the `analytics` requirements file. --- .../run_tests_locally-analytics-backend.sh | 24 +++++++++++++++++ .../run_tests_locally-telemetry-backend.sh | 3 ++- .../run_tests_locally-telemetry-frontend.sh | 3 ++- scripts/show_logs_telemetry-backend.sh | 27 +++++++++++++++++++ scripts/show_logs_telemetry-frontend.sh | 27 +++++++++++++++++++ src/analytics/requirements.in | 1 - src/kpi_value_api/.gitlab-ci.yml | 3 ++- src/kpi_value_writer/.gitlab-ci.yml | 3 ++- src/telemetry/.gitlab-ci.yml | 3 ++- 9 files changed, 88 insertions(+), 6 deletions(-) create mode 100755 scripts/run_tests_locally-analytics-backend.sh create mode 100755 scripts/show_logs_telemetry-backend.sh create mode 100755 scripts/show_logs_telemetry-frontend.sh diff --git a/scripts/run_tests_locally-analytics-backend.sh b/scripts/run_tests_locally-analytics-backend.sh new file mode 100755 index 000000000..99a789c9f --- /dev/null +++ b/scripts/run_tests_locally-analytics-backend.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src +RCFILE=$PROJECTDIR/coverage/.coveragerc +CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') +export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" +python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ + analytics/backend/tests/test_backend.py diff --git a/scripts/run_tests_locally-telemetry-backend.sh b/scripts/run_tests_locally-telemetry-backend.sh index 79db05fcf..422574d0e 100755 --- a/scripts/run_tests_locally-telemetry-backend.sh +++ b/scripts/run_tests_locally-telemetry-backend.sh @@ -22,7 +22,8 @@ cd $PROJECTDIR/src # kpi_manager/tests/test_unitary.py # python3 kpi_manager/tests/test_unitary.py - +CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') +export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" RCFILE=$PROJECTDIR/coverage/.coveragerc python3 -m pytest --log-level=INFO --log-cli-level=debug --verbose \ telemetry/backend/tests/test_TelemetryBackend.py diff --git a/scripts/run_tests_locally-telemetry-frontend.sh b/scripts/run_tests_locally-telemetry-frontend.sh index a2a1de523..0ed828310 100755 --- a/scripts/run_tests_locally-telemetry-frontend.sh +++ b/scripts/run_tests_locally-telemetry-frontend.sh @@ -22,7 +22,8 @@ cd $PROJECTDIR/src # kpi_manager/tests/test_unitary.py # python3 kpi_manager/tests/test_unitary.py - +CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') +export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" RCFILE=$PROJECTDIR/coverage/.coveragerc python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ telemetry/frontend/tests/test_frontend.py diff --git a/scripts/show_logs_telemetry-backend.sh b/scripts/show_logs_telemetry-backend.sh new file mode 100755 index 000000000..c28083dcb --- /dev/null +++ b/scripts/show_logs_telemetry-backend.sh @@ -0,0 +1,27 @@ +#!/bin/bash +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +######################################################################################################################## +# Define your deployment settings here +######################################################################################################################## + +# If not already set, set the name of the Kubernetes namespace to deploy to. +export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"} + +######################################################################################################################## +# Automated steps start here +######################################################################################################################## + +kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/telemetryservice -c backend diff --git a/scripts/show_logs_telemetry-frontend.sh b/scripts/show_logs_telemetry-frontend.sh new file mode 100755 index 000000000..821dc275b --- /dev/null +++ b/scripts/show_logs_telemetry-frontend.sh @@ -0,0 +1,27 @@ +#!/bin/bash +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +######################################################################################################################## +# Define your deployment settings here +######################################################################################################################## + +# If not already set, set the name of the Kubernetes namespace to deploy to. +export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"} + +######################################################################################################################## +# Automated steps start here +######################################################################################################################## + +kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/telemetryservice -c frontend diff --git a/src/analytics/requirements.in b/src/analytics/requirements.in index 8ff30ddaa..cb5febf0d 100644 --- a/src/analytics/requirements.in +++ b/src/analytics/requirements.in @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -java==11.0.* pyspark==3.5.2 confluent-kafka==2.3.* psycopg2-binary==2.9.* diff --git a/src/kpi_value_api/.gitlab-ci.yml b/src/kpi_value_api/.gitlab-ci.yml index 1a6f821ba..fef96d5b5 100644 --- a/src/kpi_value_api/.gitlab-ci.yml +++ b/src/kpi_value_api/.gitlab-ci.yml @@ -61,7 +61,8 @@ unit_test kpi-value-api: docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181 bitnami/zookeeper:latest - sleep 10 # Wait for Zookeeper to start - - docker run --name kafka -d --network=teraflowbridge -p 9092:9092 + - > + docker run --name kafka -d --network=teraflowbridge -p 9092:9092 --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env ALLOW_PLAINTEXT_LISTENER=yes bitnami/kafka:latest diff --git a/src/kpi_value_writer/.gitlab-ci.yml b/src/kpi_value_writer/.gitlab-ci.yml index 9a2f9fd47..4b36165d0 100644 --- a/src/kpi_value_writer/.gitlab-ci.yml +++ b/src/kpi_value_writer/.gitlab-ci.yml @@ -61,7 +61,8 @@ unit_test kpi-value-writer: docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181 bitnami/zookeeper:latest - sleep 10 # Wait for Zookeeper to start - - docker run --name kafka -d --network=teraflowbridge -p 9092:9092 + - > + docker run --name kafka -d --network=teraflowbridge -p 9092:9092 --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env ALLOW_PLAINTEXT_LISTENER=yes bitnami/kafka:latest diff --git a/src/telemetry/.gitlab-ci.yml b/src/telemetry/.gitlab-ci.yml index 110a6490d..48fd2f493 100644 --- a/src/telemetry/.gitlab-ci.yml +++ b/src/telemetry/.gitlab-ci.yml @@ -71,7 +71,8 @@ unit_test telemetry-backend: docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181 bitnami/zookeeper:latest - sleep 10 # Wait for Zookeeper to start - - docker run --name kafka -d --network=teraflowbridge -p 9092:9092 + - > + docker run --name kafka -d --network=teraflowbridge -p 9092:9092 --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env ALLOW_PLAINTEXT_LISTENER=yes bitnami/kafka:latest -- GitLab From c0c517f032299f1879339ea18742c01ab594fb15 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Sun, 22 Sep 2024 16:33:57 +0000 Subject: [PATCH 02/12] Minor Changes in New Monitoring Services to Improve Overall Functionality - Added new tests in the `scripts` folder. - Improved messages and test files across all services. Telemetry and Analytics Backend: - Added a `try` statement to handle Kafka message decoding errors. Telemetry Frontend and KPI Value API: - Downgraded `apscheduler` from `3.10.4` to `3.10.1` in the requirements file. Kafka: - Added a method to retrieve `KAFKA_SERVER_ADDRESS` from environment variables. Telemetry Backend: - Added a condition to allow the collector backend to run indefinitely. - Changed the extracted `KpiValue` timestamp format to `"%Y-%m-%dT%H:%M:%SZ"` to meet Apache Spark requirements. - Removed some unused packages from the `requirements.in` file. Deployment Script: - Added `"telemetry"` and `"analytics"` component names to the `TFS_COMPONENTS` variable in the `my_deploy.sh` file. --- my_deploy.sh | 2 +- scripts/run_tests_locally-kpi-DB.sh | 2 +- scripts/run_tests_locally-telemetry-DB.sh | 3 +- .../run_tests_locally-telemetry-backend.sh | 2 +- .../service/AnalyticsBackendService.py | 31 ++++++------ src/analytics/backend/tests/messages.py | 4 +- src/analytics/backend/tests/test_backend.py | 30 +++++------ src/analytics/frontend/requirements.in | 2 +- src/analytics/frontend/tests/messages.py | 5 +- src/analytics/frontend/tests/test_frontend.py | 50 +++++-------------- src/common/tools/kafka/Variables.py | 13 ++--- src/kpi_manager/tests/test_messages.py | 2 + src/kpi_value_api/requirements.in | 2 +- src/kpi_value_api/tests/test_kpi_value_api.py | 7 +++ .../service/TelemetryBackendService.py | 30 ++++++----- .../tests/{messagesBackend.py => messages.py} | 0 ...st_TelemetryBackend.py => test_backend.py} | 3 +- src/telemetry/frontend/tests/Messages.py | 13 +++-- src/telemetry/frontend/tests/test_frontend.py | 47 ++--------------- src/telemetry/requirements.in | 6 +-- 20 files changed, 106 insertions(+), 148 deletions(-) rename src/telemetry/backend/tests/{messagesBackend.py => messages.py} (100%) rename src/telemetry/backend/tests/{test_TelemetryBackend.py => test_backend.py} (91%) diff --git a/my_deploy.sh b/my_deploy.sh index 88be82b63..5b535a959 100755 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -26,7 +26,7 @@ export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_gene #export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring" # Uncomment to activate Monitoring Framework (new) -#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api" +#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api telemetry analytics" # Uncomment to activate BGP-LS Speaker #export TFS_COMPONENTS="${TFS_COMPONENTS} bgpls_speaker" diff --git a/scripts/run_tests_locally-kpi-DB.sh b/scripts/run_tests_locally-kpi-DB.sh index 4953b49e0..ad1b4c57b 100755 --- a/scripts/run_tests_locally-kpi-DB.sh +++ b/scripts/run_tests_locally-kpi-DB.sh @@ -24,7 +24,7 @@ cd $PROJECTDIR/src # python3 kpi_manager/tests/test_unitary.py RCFILE=$PROJECTDIR/coverage/.coveragerc -CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}') +CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace ${CRDB_NAMESPACE} -o 'jsonpath={.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ kpi_manager/tests/test_kpi_db.py diff --git a/scripts/run_tests_locally-telemetry-DB.sh b/scripts/run_tests_locally-telemetry-DB.sh index 4b9a41760..529774f2d 100755 --- a/scripts/run_tests_locally-telemetry-DB.sh +++ b/scripts/run_tests_locally-telemetry-DB.sh @@ -20,7 +20,8 @@ cd $PROJECTDIR/src # RCFILE=$PROJECTDIR/coverage/.coveragerc # coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ # kpi_manager/tests/test_unitary.py - +CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}') +export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" RCFILE=$PROJECTDIR/coverage/.coveragerc python3 -m pytest --log-level=DEBUG --log-cli-level=debug --verbose \ telemetry/tests/test_telemetryDB.py diff --git a/scripts/run_tests_locally-telemetry-backend.sh b/scripts/run_tests_locally-telemetry-backend.sh index 422574d0e..4867335a5 100755 --- a/scripts/run_tests_locally-telemetry-backend.sh +++ b/scripts/run_tests_locally-telemetry-backend.sh @@ -26,4 +26,4 @@ CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o js export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" RCFILE=$PROJECTDIR/coverage/.coveragerc python3 -m pytest --log-level=INFO --log-cli-level=debug --verbose \ - telemetry/backend/tests/test_TelemetryBackend.py + telemetry/backend/tests/test_backend.py diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 658d23795..96b1f5242 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -46,10 +46,10 @@ class AnalyticsBackendService(GenericGrpcService): thresholds = analyzer['thresholds'] window_size = analyzer['window_size'] window_slider = analyzer['window_slider'] - print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format( - kpi_list, oper_list, thresholds, window_size, window_slider)) - LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format( - kpi_list, oper_list, thresholds, window_size, window_slider)) + # print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format( + # kpi_list, oper_list, thresholds, window_size, window_slider)) + # LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format( + # kpi_list, oper_list, thresholds, window_size, window_slider)) try: stop_event = threading.Event() thread = threading.Thread(target=SparkStreamer, @@ -86,6 +86,7 @@ class AnalyticsBackendService(GenericGrpcService): listener for requests on Kafka topic. """ LOGGER.info("Request Listener is initiated ...") + print ("Request Listener is initiated ...") consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value]) while True: @@ -98,17 +99,19 @@ class AnalyticsBackendService(GenericGrpcService): else: print("Consumer error: {}".format(receive_msg.error())) break - analyzer = json.loads(receive_msg.value().decode('utf-8')) - analyzer_uuid = receive_msg.key().decode('utf-8') - LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) - print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) + try: + analyzer = json.loads(receive_msg.value().decode('utf-8')) + analyzer_uuid = receive_msg.key().decode('utf-8') + LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) + print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) - if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: - self.TerminateAnalyzerBackend(analyzer_uuid) - else: - self.StartSparkStreamer(analyzer_uuid, analyzer) - LOGGER.debug("Stop Event activated. Terminating...") - print ("Stop Event activated. Terminating...") + if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: + self.TerminateAnalyzerBackend(analyzer_uuid) + else: + self.StartSparkStreamer(analyzer_uuid, analyzer) + except Exception as e: + LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) + print ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) def TerminateAnalyzerBackend(self, analyzer_uuid): if analyzer_uuid in self.running_threads: diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py index c3b78967e..e5faaa1f5 100644 --- a/src/analytics/backend/tests/messages.py +++ b/src/analytics/backend/tests/messages.py @@ -42,14 +42,14 @@ def get_threshold_dict(): def create_analyzer(): _create_analyzer = Analyzer() # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) - _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING _kpi_id = KpiId() # input IDs to analyze _kpi_id.kpi_id.uuid = str(uuid.uuid4()) - _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" _create_analyzer.input_kpi_ids.append(_kpi_id) _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index 9221bb23e..c2e7fbe31 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -34,6 +34,7 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) +# --- To test Start Streamer functionality --- def test_StartSparkStreamer(): LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") analyzer_obj = create_analyzer() @@ -52,26 +53,25 @@ def test_StartSparkStreamer(): response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate) assert isinstance(response, bool) +# --- To TEST StartRequestListenerFunctionality # def test_StartRequestListener(): # LOGGER.info('test_RunRequestListener') # AnalyticsBackendServiceObj = AnalyticsBackendService() -# response = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) -# LOGGER.debug(str(response)) -# assert isinstance(response, tuple) +# threading.Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()).start() # To test START and STOP communication together -def test_StopRequestListener(): - LOGGER.info('test_RunRequestListener') - LOGGER.info('Initiating StartRequestListener...') - AnalyticsBackendServiceObj = AnalyticsBackendService() - response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) - # LOGGER.debug(str(response_thread)) - time.sleep(10) - LOGGER.info('Initiating StopRequestListener...') - AnalyticsBackendServiceObj = AnalyticsBackendService() - response = AnalyticsBackendServiceObj.StopRequestListener(response_thread) - LOGGER.debug(str(response)) - assert isinstance(response, bool) +# def test_StopRequestListener(): +# LOGGER.info('test_RunRequestListener') +# LOGGER.info('Initiating StartRequestListener...') +# AnalyticsBackendServiceObj = AnalyticsBackendService() +# response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) +# # LOGGER.debug(str(response_thread)) +# time.sleep(10) +# LOGGER.info('Initiating StopRequestListener...') +# AnalyticsBackendServiceObj = AnalyticsBackendService() +# response = AnalyticsBackendServiceObj.StopRequestListener(response_thread) +# LOGGER.debug(str(response)) +# assert isinstance(response, bool) # To independently tests the SparkListener functionality # def test_SparkListener(): diff --git a/src/analytics/frontend/requirements.in b/src/analytics/frontend/requirements.in index d81b9ddbe..0b1ec921b 100644 --- a/src/analytics/frontend/requirements.in +++ b/src/analytics/frontend/requirements.in @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -apscheduler==3.10.4 +apscheduler==3.10.1 confluent-kafka==2.3.* psycopg2-binary==2.9.* SQLAlchemy==1.4.* diff --git a/src/analytics/frontend/tests/messages.py b/src/analytics/frontend/tests/messages.py index 646de962e..eb25c33b0 100644 --- a/src/analytics/frontend/tests/messages.py +++ b/src/analytics/frontend/tests/messages.py @@ -21,13 +21,14 @@ from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, Analyze def create_analyzer_id(): _create_analyzer_id = AnalyzerId() # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) - _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + # _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + _create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" return _create_analyzer_id def create_analyzer(): _create_analyzer = Analyzer() # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) - _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py index 9f5c040f3..1b4e0e14e 100644 --- a/src/analytics/frontend/tests/test_frontend.py +++ b/src/analytics/frontend/tests/test_frontend.py @@ -26,7 +26,6 @@ from common.Settings import ( get_service_port_grpc, get_env_var_name, from common.tools.kafka.Variables import KafkaTopic from common.proto.kpi_value_api_pb2 import KpiValue -from common.proto.analytics_frontend_pb2 import AnalyzerAlarms from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer, @@ -34,7 +33,7 @@ from analytics.frontend.tests.messages import ( create_analyze from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger - +from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList ########################### # Tests Setup @@ -85,46 +84,23 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic ########################### # --- "test_validate_kafka_topics" should be executed before the functionality tests --- -def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) - -# # ----- core funtionality test ----- -def test_StartAnalytics(analyticsFrontend_client): - LOGGER.info(' >>> test_StartAnalytic START: <<< ') - stream = analyticsFrontend_client.StartAnalyzer(create_analyzer()) - for response in stream: - LOGGER.debug(str(response)) - assert isinstance(response, KpiValue) - -# To test start and stop listener together def test_StartStopAnalyzers(analyticsFrontend_client): - LOGGER.info(' >>> test_StartStopAnalyzers START: <<< ') - LOGGER.info('--> StartAnalyzer') + LOGGER.info(' >>> test_StartAnalyzers START: <<< ') added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer()) LOGGER.debug(str(added_analyzer_id)) - LOGGER.info(' --> Calling StartResponseListener... ') - class_obj = AnalyticsFrontendServiceServicerImpl() - response = class_obj.StartResponseListener(added_analyzer_id.analyzer_id._uuid) - LOGGER.debug(response) - LOGGER.info("waiting for timer to comlete ...") - time.sleep(3) - LOGGER.info('--> StopAnalyzer') - response = analyticsFrontend_client.StopAnalyzer(added_analyzer_id) + assert isinstance(added_analyzer_id, AnalyzerId) + +def test_StopAnalytic(analyticsFrontend_client): + LOGGER.info(' >>> test_StopAnalytic START: <<< ') + response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id()) LOGGER.debug(str(response)) + assert isinstance(response, Empty) -# def test_SelectAnalytics(analyticsFrontend_client): -# LOGGER.info(' >>> test_SelectAnalytics START: <<< ') -# response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter()) -# LOGGER.debug(str(response)) -# assert isinstance(response, AnalyzerList) - -# def test_StopAnalytic(analyticsFrontend_client): -# LOGGER.info(' >>> test_StopAnalytic START: <<< ') -# response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id()) -# LOGGER.debug(str(response)) -# assert isinstance(response, Empty) +def test_SelectAnalytics(analyticsFrontend_client): + LOGGER.info(' >>> test_SelectAnalytics START: <<< ') + response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter()) + LOGGER.debug(str(response)) + assert isinstance(response, AnalyzerList) # def test_ResponseListener(): # LOGGER.info(' >>> test_ResponseListener START <<< ') diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index b5cb3bbe0..9e432d637 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -25,11 +25,12 @@ class KafkaConfig(Enum): @staticmethod def get_kafka_address() -> str: - # kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None) - # if kafka_server_address is None: - KFK_NAMESPACE = get_setting('KFK_NAMESPACE') - KFK_PORT = get_setting('KFK_SERVER_PORT') - kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) + kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None) + if kafka_server_address is None: + KFK_NAMESPACE = get_setting('KFK_NAMESPACE') + KFK_PORT = get_setting('KFK_SERVER_PORT') + kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) + # kafka_server_address = "127.0.0.1:9092" return kafka_server_address @staticmethod @@ -90,4 +91,4 @@ class KafkaTopic(Enum): return False return True -# create all topics after the deployments (Telemetry and Analytics) +# TODO: create all topics after the deployments (Telemetry and Analytics) diff --git a/src/kpi_manager/tests/test_messages.py b/src/kpi_manager/tests/test_messages.py index 7b5c45859..08a2dbf73 100644 --- a/src/kpi_manager/tests/test_messages.py +++ b/src/kpi_manager/tests/test_messages.py @@ -27,6 +27,8 @@ def create_kpi_id_request(): def create_kpi_descriptor_request(descriptor_name: str = "Test_name"): _create_kpi_request = kpi_manager_pb2.KpiDescriptor() _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + # _create_kpi_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + # _create_kpi_request.kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_kpi_request.kpi_description = descriptor_name _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' diff --git a/src/kpi_value_api/requirements.in b/src/kpi_value_api/requirements.in index e95d6d8bb..0615fa833 100644 --- a/src/kpi_value_api/requirements.in +++ b/src/kpi_value_api/requirements.in @@ -15,4 +15,4 @@ confluent-kafka==2.3.* requests==2.27.* prometheus-api-client==0.5.3 -apscheduler==3.10.4 +apscheduler==3.10.1 diff --git a/src/kpi_value_api/tests/test_kpi_value_api.py b/src/kpi_value_api/tests/test_kpi_value_api.py index ea6b22585..c7eb8fef1 100644 --- a/src/kpi_value_api/tests/test_kpi_value_api.py +++ b/src/kpi_value_api/tests/test_kpi_value_api.py @@ -78,6 +78,13 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) +# def test_GetKpiAlarms(kpi_value_api_client): +# LOGGER.debug(" >>> test_GetKpiAlarms") +# stream = kpi_value_api_client.GetKpiAlarms(create_kpi_id_request()) +# for response in stream: +# LOGGER.debug(str(response)) +# assert isinstance(response, KpiAlarms) + def test_store_kpi_values(kpi_value_api_client): LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") response = kpi_value_api_client.StoreKpiValues(create_kpi_value_list()) diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 078fa5896..f3cf18d65 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -17,7 +17,8 @@ import time import random import logging import threading -from typing import Any, Dict +from typing import Any, Dict +from datetime import datetime, timezone # from common.proto.context_pb2 import Empty from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Consumer as KafkaConsumer @@ -53,6 +54,8 @@ class TelemetryBackendService(GenericGrpcService): """ listener for requests on Kafka topic. """ + LOGGER.info('Telemetry backend request listener is running ...') + print ('Telemetry backend request listener is running ...') consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.REQUEST.value]) while True: @@ -65,16 +68,19 @@ class TelemetryBackendService(GenericGrpcService): else: print("Consumer error: {}".format(receive_msg.error())) break - - collector = json.loads(receive_msg.value().decode('utf-8')) - collector_id = receive_msg.key().decode('utf-8') - LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) - print('Recevied Collector: {:} - {:}'.format(collector_id, collector)) + try: + collector = json.loads(receive_msg.value().decode('utf-8')) + collector_id = receive_msg.key().decode('utf-8') + LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) + print('Recevied Collector: {:} - {:}'.format(collector_id, collector)) - if collector['duration'] == -1 and collector['interval'] == -1: - self.TerminateCollectorBackend(collector_id) - else: - self.RunInitiateCollectorBackend(collector_id, collector) + if collector['duration'] == -1 and collector['interval'] == -1: + self.TerminateCollectorBackend(collector_id) + else: + self.RunInitiateCollectorBackend(collector_id, collector) + except Exception as e: + LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e)) + print ("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e)) def TerminateCollectorBackend(self, collector_id): if collector_id in self.running_threads: @@ -101,7 +107,7 @@ class TelemetryBackendService(GenericGrpcService): print("Initiating backend for collector: ", collector_id) start_time = time.time() while not stop_event.is_set(): - if time.time() - start_time >= collector['duration']: # condition to terminate backend + if int(collector['duration']) != -1 and time.time() - start_time >= collector['duration']: # condition to terminate backend print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time) self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. break @@ -140,7 +146,7 @@ class TelemetryBackendService(GenericGrpcService): """ producer = self.kafka_producer kpi_value : Dict = { - "time_stamp": str(time.time()), + "time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "kpi_id" : kpi_id, "kpi_value" : measured_kpi_value } diff --git a/src/telemetry/backend/tests/messagesBackend.py b/src/telemetry/backend/tests/messages.py similarity index 100% rename from src/telemetry/backend/tests/messagesBackend.py rename to src/telemetry/backend/tests/messages.py diff --git a/src/telemetry/backend/tests/test_TelemetryBackend.py b/src/telemetry/backend/tests/test_backend.py similarity index 91% rename from src/telemetry/backend/tests/test_TelemetryBackend.py rename to src/telemetry/backend/tests/test_backend.py index 665fa825e..4764d7f5f 100644 --- a/src/telemetry/backend/tests/test_TelemetryBackend.py +++ b/src/telemetry/backend/tests/test_backend.py @@ -34,5 +34,4 @@ LOGGER = logging.getLogger(__name__) def test_RunRequestListener(): LOGGER.info('test_RunRequestListener') TelemetryBackendServiceObj = TelemetryBackendService() - response = threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start() - LOGGER.debug(str(response)) + threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start() \ No newline at end of file diff --git a/src/telemetry/frontend/tests/Messages.py b/src/telemetry/frontend/tests/Messages.py index a0e93e8a1..e6d8ef439 100644 --- a/src/telemetry/frontend/tests/Messages.py +++ b/src/telemetry/frontend/tests/Messages.py @@ -22,15 +22,18 @@ from common.proto.kpi_manager_pb2 import KpiId def create_collector_id(): _collector_id = telemetry_frontend_pb2.CollectorId() # _collector_id.collector_id.uuid = str(uuid.uuid4()) - _collector_id.collector_id.uuid = "5d45f53f-d567-429f-9427-9196ac72ff0c" + _collector_id.collector_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd" return _collector_id def create_collector_request(): _create_collector_request = telemetry_frontend_pb2.Collector() - _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) - _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) - _create_collector_request.duration_s = float(random.randint(8, 16)) - _create_collector_request.interval_s = float(random.randint(2, 4)) + # _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) + _create_collector_request.collector_id.collector_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd" + # _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_collector_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + # _create_collector_request.duration_s = float(random.randint(8, 16)) + _create_collector_request.duration_s = -1 + _create_collector_request.interval_s = float(random.randint(3, 5)) return _create_collector_request def create_collector_filter(): diff --git a/src/telemetry/frontend/tests/test_frontend.py b/src/telemetry/frontend/tests/test_frontend.py index 9c3f9d3a8..c3f8091c8 100644 --- a/src/telemetry/frontend/tests/test_frontend.py +++ b/src/telemetry/frontend/tests/test_frontend.py @@ -105,47 +105,10 @@ def test_SelectCollectors(telemetryFrontend_client): LOGGER.debug(str(response)) assert isinstance(response, CollectorList) -# ----- Non-gRPC method tests ----- -def test_RunResponseListener(): - LOGGER.info(' >>> test_RunResponseListener START: <<< ') - TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl() - response = TelemetryFrontendServiceObj.RunResponseListener() # becasue Method "run_kafka_listener" is not define in frontend.proto - LOGGER.debug(str(response)) - assert isinstance(response, bool) - -# ------- previous test ---------------- - -# def test_verify_db_and_table(): -# LOGGER.info(' >>> test_verify_database_and_tables START: <<< ') -# _engine = TelemetryEngine.get_engine() -# managementDB.create_database(_engine) -# managementDB.create_tables(_engine) - -# def test_StartCollector(telemetryFrontend_client): -# LOGGER.info(' >>> test_StartCollector START: <<< ') -# response = telemetryFrontend_client.StartCollector(create_collector_request()) -# LOGGER.debug(str(response)) -# assert isinstance(response, CollectorId) - -# def test_run_kafka_listener(): -# LOGGER.info(' >>> test_run_kafka_listener START: <<< ') -# name_mapping = NameMapping() -# TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl(name_mapping) -# response = TelemetryFrontendServiceObj.run_kafka_listener() # Method "run_kafka_listener" is not define in frontend.proto +# # ----- Non-gRPC method tests ----- +# def test_RunResponseListener(): +# LOGGER.info(' >>> test_RunResponseListener START: <<< ') +# TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl() +# response = TelemetryFrontendServiceObj.RunResponseListener() # becasue Method "run_kafka_listener" is not define in frontend.proto # LOGGER.debug(str(response)) # assert isinstance(response, bool) - -# def test_StopCollector(telemetryFrontend_client): -# LOGGER.info(' >>> test_StopCollector START: <<< ') -# _collector_id = telemetryFrontend_client.StartCollector(create_collector_request()) -# time.sleep(3) # wait for small amount before call the stopCollecter() -# response = telemetryFrontend_client.StopCollector(_collector_id) -# LOGGER.debug(str(response)) -# assert isinstance(response, Empty) - -# def test_select_collectors(telemetryFrontend_client): -# LOGGER.info(' >>> test_select_collector requesting <<< ') -# response = telemetryFrontend_client.SelectCollectors(create_collector_filter()) -# LOGGER.info('Received Rows after applying Filter: {:} '.format(response)) -# LOGGER.debug(str(response)) -# assert isinstance(response, CollectorList) \ No newline at end of file diff --git a/src/telemetry/requirements.in b/src/telemetry/requirements.in index a0e78d2bf..4e9981afb 100644 --- a/src/telemetry/requirements.in +++ b/src/telemetry/requirements.in @@ -12,13 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -anytree==2.8.0 APScheduler==3.10.1 -influx-line-protocol==0.1.4 psycopg2-binary==2.9.3 python-dateutil==2.8.2 python-json-logger==2.0.2 pytz==2024.1 -questdb==1.0.1 -requests==2.27.1 -xmltodict==0.12.0 \ No newline at end of file +requests==2.27.1 \ No newline at end of file -- GitLab From 77551e25682b90407b87324e14f93acd92459722 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Sun, 22 Sep 2024 16:49:31 +0000 Subject: [PATCH 03/12] Gitlab-ci file added in Analytics --- src/analytics/.gitlab-ci.yml | 203 ++++++++++++++++++++++++++++++++++ src/analytics/requirements.in | 1 - 2 files changed, 203 insertions(+), 1 deletion(-) create mode 100644 src/analytics/.gitlab-ci.yml diff --git a/src/analytics/.gitlab-ci.yml b/src/analytics/.gitlab-ci.yml new file mode 100644 index 000000000..33ea9f3cf --- /dev/null +++ b/src/analytics/.gitlab-ci.yml @@ -0,0 +1,203 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Build, tag, and push the Docker image to the GitLab Docker registry +build analytics: + variables: + IMAGE_NAME: 'analytics' # name of the microservice + IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) + stage: build + before_script: + - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY + script: + # This first build tags the builder resulting image to prevent being removed by dangling image removal command + # - docker buildx build -t "${IMAGE_NAME}-backend:${IMAGE_TAG}-builder" --target builder -f ./src/$IMAGE_NAME/backend/Dockerfile . + - docker buildx build -t "${IMAGE_NAME}-frontend:$IMAGE_TAG" -f ./src/$IMAGE_NAME/frontend/Dockerfile . + - docker buildx build -t "${IMAGE_NAME}-backend:$IMAGE_TAG" -f ./src/$IMAGE_NAME/backend/Dockerfile . + - docker tag "${IMAGE_NAME}-frontend:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-frontend:$IMAGE_TAG" + - docker tag "${IMAGE_NAME}-backend:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-backend:$IMAGE_TAG" + - docker push "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-frontend:$IMAGE_TAG" + - docker push "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-backend:$IMAGE_TAG" + after_script: + - docker images --filter="dangling=true" --quiet | xargs -r docker rmi + rules: + - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)' + - if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"' + - changes: + - src/common/**/*.py + - proto/*.proto + - src/$IMAGE_NAME/.gitlab-ci.yml + - src/$IMAGE_NAME/frontend/**/*.{py,in,yml} + - src/$IMAGE_NAME/frontend/Dockerfile + - src/$IMAGE_NAME/frontend/tests/*.py + - src/$IMAGE_NAME/backend/Dockerfile + - src/$IMAGE_NAME/backend/**/*.{py,in,yml} + - src/$IMAGE_NAME/backend/tests/*.py + - manifests/${IMAGE_NAME}service.yaml + - .gitlab-ci.yml + +# Apply unit test to the component +unit_test analytics-backend: + variables: + IMAGE_NAME: 'analytics' # name of the microservice + IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) + stage: unit_test + needs: + - build analytics + before_script: + - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY + - if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi + - if docker container ls | grep kafka; then docker rm -f kafka; else echo "Kafka container is not in the system"; fi + - if docker container ls | grep zookeeper; then docker rm -f zookeeper; else echo "Zookeeper container is not in the system"; fi + - if docker container ls | grep ${IMAGE_NAME}-backend; then docker rm -f ${IMAGE_NAME}-backend; else echo "${IMAGE_NAME}-backend container is not in the system"; fi + - docker container prune -f + script: + - docker pull "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-backend:$IMAGE_TAG" + - docker pull "bitnami/zookeeper:latest" + - docker pull "bitnami/kafka:latest" + - > + docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181 + bitnami/zookeeper:latest + - sleep 10 # Wait for Zookeeper to start + - > + docker run --name kafka -d --network=teraflowbridge -p 9092:9092 + --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + --env ALLOW_PLAINTEXT_LISTENER=yes + bitnami/kafka:latest + - sleep 10 # Wait for Kafka to start + - KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}") + - echo $KAFKA_IP + - > + docker run --name $IMAGE_NAME-backend -d -p 30060:30060 + --env "KFK_SERVER_ADDRESS=${KAFKA_IP}:9092" + --volume "$PWD/src/$IMAGE_NAME/backend/tests:/opt/results" + --network=teraflowbridge + $CI_REGISTRY_IMAGE/${IMAGE_NAME}-backend:$IMAGE_TAG + - docker ps -a + - sleep 5 + - docker logs ${IMAGE_NAME}-backend + - > + docker exec -i ${IMAGE_NAME}-backend bash -c + "coverage run -m pytest --log-level=INFO --verbose --junitxml=/opt/results/${IMAGE_NAME}-backend_report.xml $IMAGE_NAME/backend/tests/test_*.py" + - docker exec -i ${IMAGE_NAME}-backend bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing" + coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/' + after_script: + - docker network rm teraflowbridge + - docker volume prune --force + - docker image prune --force + - docker rm -f ${IMAGE_NAME}-backend + - docker rm -f zookeeper + - docker rm -f kafka + rules: + - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)' + - if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"' + - changes: + - src/common/**/*.py + - proto/*.proto + - src/$IMAGE_NAME/backend/**/*.{py,in,yml} + - src/$IMAGE_NAME/backend/Dockerfile + - src/$IMAGE_NAME/backend/tests/*.py + - manifests/${IMAGE_NAME}service.yaml + - .gitlab-ci.yml + artifacts: + when: always + reports: + junit: src/$IMAGE_NAME/backend/tests/${IMAGE_NAME}-backend_report.xml + +# Apply unit test to the component +unit_test analytics-frontend: + variables: + IMAGE_NAME: 'analytics' # name of the microservice + IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) + stage: unit_test + needs: + - build analytics + before_script: + - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY + - if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi + - if docker container ls | grep crdb; then docker rm -f crdb; else echo "CockroachDB container is not in the system"; fi + - if docker volume ls | grep crdb; then docker volume rm -f crdb; else echo "CockroachDB volume is not in the system"; fi + - if docker container ls | grep kafka; then docker rm -f kafka; else echo "Kafka container is not in the system"; fi + - if docker container ls | grep zookeeper; then docker rm -f zookeeper; else echo "Zookeeper container is not in the system"; fi + - if docker container ls | grep ${IMAGE_NAME}-frontend; then docker rm -f ${IMAGE_NAME}-frontend; else echo "${IMAGE_NAME}-frontend container is not in the system"; fi + - docker container prune -f + script: + - docker pull "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-frontend:$IMAGE_TAG" + - docker pull "bitnami/zookeeper:latest" + - docker pull "bitnami/kafka:latest" + - docker pull "cockroachdb/cockroach:latest-v22.2" + - docker volume create crdb + - > + docker run --name crdb -d --network=teraflowbridge -p 26257:26257 -p 8080:8080 + --env COCKROACH_DATABASE=tfs_test --env COCKROACH_USER=tfs --env COCKROACH_PASSWORD=tfs123 + --volume "crdb:/cockroach/cockroach-data" + cockroachdb/cockroach:latest-v22.2 start-single-node + - echo "Waiting for initialization..." + - while ! docker logs crdb 2>&1 | grep -q 'finished creating default user \"tfs\"'; do sleep 1; done + # - docker logs crdb + # - docker ps -a + - CRDB_ADDRESS=$(docker inspect crdb --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}") + - echo $CRDB_ADDRESS + - > + docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181 \ + -e ALLOW_ANONYMOUS_LOGIN=yes \ + bitnami/zookeeper:latest + - sleep 10 # Wait for Zookeeper to start + - docker run --name kafka -d --network=teraflowbridge -p 9092:9092 + --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + --env ALLOW_PLAINTEXT_LISTENER=yes + bitnami/kafka:latest + - sleep 10 # Wait for Kafka to start + - KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}") + - echo $KAFKA_IP + # - docker logs zookeeper + # - docker logs kafka + - > + docker run --name $IMAGE_NAME-frontend -d -p 30050:30050 + --env "CRDB_URI=cockroachdb://tfs:tfs123@${CRDB_ADDRESS}:26257/tfs_test?sslmode=require" + --env "KFK_SERVER_ADDRESS=${KAFKA_IP}:9092" + --volume "$PWD/src/$IMAGE_NAME/frontend/tests:/opt/results" + --network=teraflowbridge + $CI_REGISTRY_IMAGE/${IMAGE_NAME}-frontend:$IMAGE_TAG + - docker ps -a + - sleep 5 + - docker logs ${IMAGE_NAME}-frontend + - > + docker exec -i ${IMAGE_NAME}-frontend bash -c + "coverage run -m pytest --log-level=INFO --verbose --junitxml=/opt/results/${IMAGE_NAME}-frontend_report.xml $IMAGE_NAME/frontend/tests/test_*.py" + - docker exec -i ${IMAGE_NAME}-frontend bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing" + coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/' + after_script: + - docker volume rm -f crdb + - docker network rm teraflowbridge + - docker volume prune --force + - docker image prune --force + - docker rm -f ${IMAGE_NAME}-frontend + - docker rm -f zookeeper + - docker rm -f kafka + rules: + - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)' + - if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"' + - changes: + - src/common/**/*.py + - proto/*.proto + - src/$IMAGE_NAME/frontend/**/*.{py,in,yml} + - src/$IMAGE_NAME/frontend/Dockerfile + - src/$IMAGE_NAME/frontend/tests/*.py + - manifests/${IMAGE_NAME}service.yaml + - .gitlab-ci.yml + artifacts: + when: always + reports: + junit: src/$IMAGE_NAME/frontend/tests/${IMAGE_NAME}-frontend_report.xml \ No newline at end of file diff --git a/src/analytics/requirements.in b/src/analytics/requirements.in index cb5febf0d..231dc04e8 100644 --- a/src/analytics/requirements.in +++ b/src/analytics/requirements.in @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -pyspark==3.5.2 confluent-kafka==2.3.* psycopg2-binary==2.9.* SQLAlchemy==1.4.* -- GitLab From ca690230bfb270bc53400daa83f09d9d7915043e Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Sun, 22 Sep 2024 16:53:16 +0000 Subject: [PATCH 04/12] In Analytics Backend: Java installation verison is upgraded from 11 to 17. --- src/analytics/backend/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/analytics/backend/Dockerfile b/src/analytics/backend/Dockerfile index df5cd7fbd..ef49657cd 100644 --- a/src/analytics/backend/Dockerfile +++ b/src/analytics/backend/Dockerfile @@ -59,7 +59,7 @@ RUN apt-get update && \ apt-get clean # Set JAVA_HOME environment variable -ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 +ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 ENV PATH=$JAVA_HOME/bin:$PATH # Create component sub-folders, get specific Python packages -- GitLab From 87802cc7862cc0868e5e050fefcfb9039638abf3 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Sun, 22 Sep 2024 17:09:36 +0000 Subject: [PATCH 05/12] Update in KpiValueAPI: - Updated `GetKpiAlarms()` to return the calculated KPI value along with the threshold. --- src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index 0f57f8821..0e2d49300 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -157,7 +157,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): value["window"]["end"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() response.kpi_id.kpi_id.uuid = value['kpi_id'] for key, threshold in value.items(): - if "THRESHOLD_" in key: + if key not in ['kpi_id', 'window']: response.alarms[key] = threshold yield response -- GitLab From dafc8e870c531e67258b311373cb54beb0e196d0 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Tue, 24 Sep 2024 14:12:54 +0000 Subject: [PATCH 06/12] Updates to Streamer Classes in Backend Analytics: - Replaced `SparkStreamer` class with `DaskStreamer` class. - Updated methods to call the `DaskStreamer` class. - Improved messages and test files. - Added new package requirements. - Made minor changes to the frontend `requirements` file. --- src/analytics/backend/requirements.in | 4 +- .../service/AnalyticsBackendService.py | 88 +++---- .../backend/service/DaskStreaming.py | 233 ++++++++++++++++++ src/analytics/backend/tests/messages.py | 45 +++- src/analytics/backend/tests/test_backend.py | 123 +++++++-- src/analytics/frontend/tests/messages.py | 9 +- 6 files changed, 423 insertions(+), 79 deletions(-) create mode 100644 src/analytics/backend/service/DaskStreaming.py diff --git a/src/analytics/backend/requirements.in b/src/analytics/backend/requirements.in index 9df678fe8..04ab95c2d 100644 --- a/src/analytics/backend/requirements.in +++ b/src/analytics/backend/requirements.in @@ -12,5 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -pyspark==3.5.2 +dask==2024.9.0 +distributed==2024.9.0 +pandas==2.2.3 confluent-kafka==2.3.* diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 96b1f5242..367d91d97 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -12,18 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. - +import time import json import logging import threading from common.tools.service.GenericGrpcService import GenericGrpcService -from analytics.backend.service.SparkStreaming import SparkStreamer from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import KafkaError from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc - +from threading import Thread, Event +from .DaskStreaming import DaskStreamer LOGGER = logging.getLogger(__name__) @@ -40,44 +40,6 @@ class AnalyticsBackendService(GenericGrpcService): 'group.id' : 'analytics-frontend', 'auto.offset.reset' : 'latest'}) - def StartSparkStreamer(self, analyzer_uuid, analyzer): - kpi_list = analyzer['input_kpis'] - oper_list = [s.split('_', 1)[0] for s in list(analyzer["thresholds"].keys())] # TODO: update this line... - thresholds = analyzer['thresholds'] - window_size = analyzer['window_size'] - window_slider = analyzer['window_slider'] - # print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format( - # kpi_list, oper_list, thresholds, window_size, window_slider)) - # LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format( - # kpi_list, oper_list, thresholds, window_size, window_slider)) - try: - stop_event = threading.Event() - thread = threading.Thread(target=SparkStreamer, - args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event, - window_size, window_slider, None )) - self.running_threads[analyzer_uuid] = (thread, stop_event) - thread.start() - print ("Initiated Analyzer backend: {:}".format(analyzer_uuid)) - LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid)) - return True - except Exception as e: - print ("Failed to initiate Analyzer backend: {:}".format(e)) - LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e)) - return False - - def StopRequestListener(self, threadInfo: tuple): - try: - thread, stop_event = threadInfo - stop_event.set() - thread.join() - print ("Terminating Analytics backend RequestListener") - LOGGER.info("Terminating Analytics backend RequestListener") - return True - except Exception as e: - print ("Failed to terminate analytics backend {:}".format(e)) - LOGGER.error("Failed to terminate analytics backend {:}".format(e)) - return False - def install_servicers(self): threading.Thread(target=self.RequestListener, args=()).start() @@ -97,8 +59,9 @@ class AnalyticsBackendService(GenericGrpcService): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue else: - print("Consumer error: {}".format(receive_msg.error())) - break + LOGGER.error("Consumer error: {:}".format(receive_msg.error())) + print ("Consumer error: {:}".format(receive_msg.error())) + continue try: analyzer = json.loads(receive_msg.value().decode('utf-8')) analyzer_uuid = receive_msg.key().decode('utf-8') @@ -106,14 +69,44 @@ class AnalyticsBackendService(GenericGrpcService): print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: - self.TerminateAnalyzerBackend(analyzer_uuid) + self.StopDaskListener(analyzer_uuid) else: - self.StartSparkStreamer(analyzer_uuid, analyzer) + self.StartDaskListener(analyzer_uuid, analyzer) except Exception as e: LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) print ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) - def TerminateAnalyzerBackend(self, analyzer_uuid): + def StartDaskListener(self, analyzer_uuid, analyzer): + kpi_list = analyzer[ 'input_kpis' ] + thresholds = analyzer[ 'thresholds' ] + window_size = analyzer[ 'window_size' ] + window_slider = analyzer[ 'window_slider'] + + LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:}".format( + kpi_list, thresholds, window_size, window_slider)) + print ("Received parameters: {:} - {:} - {:} - {:}".format( + kpi_list, thresholds, window_size, window_slider)) + try: + stop_event = Event() + thread = Thread( + target=DaskStreamer, + # args=(analyzer_uuid, kpi_list, oper_list, thresholds, stop_event), + args=(analyzer_uuid, kpi_list, thresholds, stop_event), + kwargs={ + "window_size" : window_size, + } + ) + thread.start() + self.running_threads[analyzer_uuid] = (thread, stop_event) + print ("Initiated Analyzer backend: {:}".format(analyzer_uuid)) + LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid)) + return True + except Exception as e: + print ("Failed to initiate Analyzer backend: {:}".format(e)) + LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e)) + return False + + def StopDaskListener(self, analyzer_uuid): if analyzer_uuid in self.running_threads: try: thread, stop_event = self.running_threads[analyzer_uuid] @@ -128,5 +121,4 @@ class AnalyticsBackendService(GenericGrpcService): return False else: print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid)) - LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) - # generate confirmation towards frontend + LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) diff --git a/src/analytics/backend/service/DaskStreaming.py b/src/analytics/backend/service/DaskStreaming.py new file mode 100644 index 000000000..f09da9949 --- /dev/null +++ b/src/analytics/backend/service/DaskStreaming.py @@ -0,0 +1,233 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +import json +from confluent_kafka import Consumer, Producer, KafkaException, KafkaError +import pandas as pd +from dask.distributed import Client, LocalCluster +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic + +logging.basicConfig(level=logging.INFO) +LOGGER = logging.getLogger(__name__) + +def SettingKafkaConsumerParams(): + return {'bootstrap.servers' : KafkaConfig.get_kafka_address(), + 'group.id' : 'analytics-backend', + 'auto.offset.reset' : 'latest'} + +def GetAggregationMappings(thresholds): + agg_dict = {} + for threshold_key in thresholds.keys(): + parts = threshold_key.split('_', 1) + if len(parts) != 2: + LOGGER.warning(f"Threshold key '{threshold_key}' does not follow the '_' format. Skipping.") + continue + aggregation, metric_name = parts + # Ensure that the aggregation function is valid in pandas + if aggregation not in ['mean', 'min', 'max', 'first', 'last', 'std']: + LOGGER.warning(f"Unsupported aggregation '{aggregation}' in threshold key '{threshold_key}'. Skipping.") + continue + agg_dict[threshold_key] = ('kpi_value', aggregation) + return agg_dict + +def ApplyThresholds(aggregated_df, thresholds): + """ + Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary + on the aggregated DataFrame. + Args: aggregated_df (pd.DataFrame): DataFrame with aggregated metrics. + thresholds (dict): Thresholds dictionary with keys in the format '_'. + Returns: pd.DataFrame: DataFrame with additional threshold columns. + """ + for threshold_key, threshold_values in thresholds.items(): + if threshold_key not in aggregated_df.columns: + LOGGER.warning(f"Threshold key '{threshold_key}' does not correspond to any aggregation result. Skipping threshold application.") + continue + if isinstance(threshold_values, (list, tuple)) and len(threshold_values) == 2: + fail_th, raise_th = threshold_values + aggregated_df[f"{threshold_key}_THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th + aggregated_df[f"{threshold_key}_THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th + else: + LOGGER.warning(f"Threshold values for '{threshold_key}' are not a list or tuple of length 2. Skipping threshold application.") + return aggregated_df + +def initialize_dask_client(): + """ + Initialize a local Dask cluster and client. + """ + cluster = LocalCluster(n_workers=2, threads_per_worker=2) + client = Client(cluster) + LOGGER.info(f"Dask Client Initialized: {client}") + return client, cluster + +def initialize_kafka_producer(): + return Producer({'bootstrap.servers': KafkaConfig.get_kafka_address()}) + +def delivery_report(err, msg): + if err is not None: + LOGGER.error(f"Message delivery failed: {err}") + else: + LOGGER.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") + +def process_batch(batch, agg_mappings, thresholds): + """ + Process a batch of data and apply thresholds. + Args: batch (list of dict): List of messages from Kafka. + agg_mappings (dict): Mapping from threshold key to aggregation function. + thresholds (dict): Thresholds dictionary. + Returns: list of dict: Processed records ready to be sent to Kafka. + """ + if not batch: + LOGGER.info("Empty batch received. Skipping processing.") + return [] + + df = pd.DataFrame(batch) + df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce') + df.dropna(subset=['time_stamp'], inplace=True) + required_columns = {'time_stamp', 'kpi_id', 'kpi_value'} + if not required_columns.issubset(df.columns): + LOGGER.warning(f"Batch contains missing required columns. Required columns: {required_columns}. Skipping batch.") + return [] + if df.empty: + LOGGER.info("No data after filtering by KPI IDs. Skipping processing.") + return [] + + # Perform aggregations using named aggregation + try: + agg_dict = {key: value for key, value in agg_mappings.items()} + df_agg = df.groupby(['window_start', 'kpi_id']).agg(**agg_dict).reset_index() + except Exception as e: + LOGGER.error(f"Aggregation error: {e}") + return [] + + # Apply thresholds + df_thresholded = ApplyThresholds(df_agg, thresholds) + df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ') + # Convert aggregated DataFrame to list of dicts + result = df_thresholded.to_dict(orient='records') + LOGGER.info(f"Processed batch with {len(result)} records after aggregation and thresholding.") + + return result + +def produce_result(result, producer, destination_topic): + for record in result: + try: + producer.produce( + destination_topic, + key=str(record.get('kpi_id', '')), + value=json.dumps(record), + callback=delivery_report + ) + except KafkaException as e: + LOGGER.error(f"Failed to produce message: {e}") + producer.flush() + LOGGER.info(f"Produced {len(result)} aggregated records to '{destination_topic}'.") + +def DaskStreamer(key, kpi_list, thresholds, stop_event, + window_size="30s", time_stamp_col="time_stamp"): + client, cluster = initialize_dask_client() + consumer_conf = SettingKafkaConsumerParams() + consumer = Consumer(consumer_conf) + consumer.subscribe([KafkaTopic.VALUE.value]) + producer = initialize_kafka_producer() + + # Parse window_size to seconds + try: + window_size_td = pd.to_timedelta(window_size) + window_size_seconds = window_size_td.total_seconds() + except Exception as e: + LOGGER.error(f"Invalid window_size format: {window_size}. Error: {e}") + window_size_seconds = 30 + LOGGER.info(f"Batch processing interval set to {window_size_seconds} seconds.") + + # Extract aggregation mappings from thresholds + agg_mappings = GetAggregationMappings(thresholds) + if not agg_mappings: + LOGGER.error("No valid aggregation mappings extracted from thresholds. Exiting streamer.") + consumer.close() + producer.flush() + client.close() + cluster.close() + return + try: + batch = [] + last_batch_time = time.time() + LOGGER.info("Starting to consume messages...") + + while not stop_event.is_set(): + msg = consumer.poll(1.0) + + if msg is None: + current_time = time.time() + if (current_time - last_batch_time) >= window_size_seconds and batch: + LOGGER.info("Time-based batch threshold reached. Processing batch.") + future = client.submit(process_batch, batch, agg_mappings, thresholds) + future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) + batch = [] + last_batch_time = current_time + continue + + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + LOGGER.warning(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}") + else: + LOGGER.error(f"Kafka error: {msg.error()}") + continue + + try: + message_value = json.loads(msg.value().decode('utf-8')) + except json.JSONDecodeError as e: + LOGGER.error(f"JSON decode error: {e}") + continue + + try: + message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce') + if pd.isna(message_timestamp): + LOGGER.warning(f"Invalid timestamp in message: {message_value}. Skipping message.") + continue + window_start = message_timestamp.floor(window_size) + message_value['window_start'] = window_start + except Exception as e: + LOGGER.error(f"Error processing timestamp: {e}. Skipping message.") + continue + + if message_value['kpi_id'] not in kpi_list: + LOGGER.debug(f"KPI ID '{message_value['kpi_id']}' not in kpi_list. Skipping message.") + continue + + batch.append(message_value) + + current_time = time.time() + if (current_time - last_batch_time) >= window_size_seconds and batch: + LOGGER.info("Time-based batch threshold reached. Processing batch.") + future = client.submit(process_batch, batch, agg_mappings, thresholds) + future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) + batch = [] + last_batch_time = current_time + + except Exception as e: + LOGGER.exception(f"Error in Dask streaming process: {e}") + finally: + # Process any remaining messages in the batch + if batch: + LOGGER.info("Processing remaining messages in the batch.") + future = client.submit(process_batch, batch, agg_mappings, thresholds) + future.add_done_callback(lambda fut: produce_result(fut.result(), producer, KafkaTopic.ALARMS.value)) + consumer.close() + producer.flush() + LOGGER.info("Kafka consumer and producer closed.") + client.close() + cluster.close() + LOGGER.info("Dask client and cluster closed.") diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py index e5faaa1f5..a1d4d0629 100644 --- a/src/analytics/backend/tests/messages.py +++ b/src/analytics/backend/tests/messages.py @@ -16,7 +16,7 @@ import uuid import json from common.proto.kpi_manager_pb2 import KpiId from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, - Analyzer ) + Analyzer, AnalyzerId ) def get_kpi_id_list(): return ["6e22f180-ba28-4641-b190-2287bf448888", "1e22f180-ba28-4641-b190-2287bf446666"] @@ -38,6 +38,13 @@ def get_threshold_dict(): op + '_value': threshold_dict[op+'_value'] for op in get_operation_list() if op + '_value' in threshold_dict } +def create_analyzer_id(): + _create_analyzer_id = AnalyzerId() + # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + # _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + _create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + return _create_analyzer_id + def create_analyzer(): _create_analyzer = Analyzer() @@ -70,4 +77,38 @@ def create_analyzer(): _create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet - return _create_analyzer \ No newline at end of file + return _create_analyzer + +def create_analyzer_dask(): + _create_analyzer = Analyzer() + # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" + _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING + + _kpi_id = KpiId() + # input IDs to analyze + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + _create_analyzer.input_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + _create_analyzer.input_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.input_kpi_ids.append(_kpi_id) + # output IDs after analysis + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.output_kpi_ids.append(_kpi_id) + _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_analyzer.output_kpi_ids.append(_kpi_id) + # parameter + + _threshold_dict = { + 'mean_PKS_TX' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50),#} + 'first_value' :(00, 50), 'last_value' :(50, 100), 'std_value' :(0, 90)} + _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) + _create_analyzer.parameters['oper_list'] = json.dumps([key.split('_')[0] for key in _threshold_dict.keys()]) + _create_analyzer.parameters['window_size'] = "10s" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" + _create_analyzer.parameters['window_slider'] = "5s" # should be less than window size + _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet + return _create_analyzer diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index c2e7fbe31..48ce86747 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -19,7 +19,9 @@ import threading from common.tools.kafka.Variables import KafkaTopic from analytics.backend.service.AnalyticsBackendService import AnalyticsBackendService from analytics.backend.tests.messages import get_kpi_id_list, get_operation_list, get_threshold_dict -from .messages import create_analyzer +from .messages import create_analyzer, create_analyzer_dask +from threading import Thread, Event +from ..service.DaskStreaming import DaskStreamer LOGGER = logging.getLogger(__name__) @@ -29,35 +31,108 @@ LOGGER = logging.getLogger(__name__) ########################### # --- "test_validate_kafka_topics" should be run before the functionality tests --- -def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) +# def test_validate_kafka_topics(): +# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") +# response = KafkaTopic.create_all_topics() +# assert isinstance(response, bool) + + +# --- To test Dask Streamer functionality --- +# def test_StartDaskStreamer(): # Directly from the Streamer class +# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") +# stop_event = Event() +# kpi_list = ["1e22f180-ba28-4641-b190-2287bf446666", "6e22f180-ba28-4641-b190-2287bf448888", 'kpi_3'] +# oper_list = ['avg', 'min', 'max',] +# thresholds = { +# 'avg_value': (10.0, 90.0), +# 'min_value': (5.0, 95.0), +# 'max_value': (15.0, 85.0), +# 'latency' : (2.0, 10.0) +# } + +# # Start the DaskStreamer in a separate thread +# streamer_thread = Thread( +# target=DaskStreamer, +# args=("analytics_stream", kpi_list, oper_list, thresholds, stop_event), +# kwargs={ +# "window_size": "60s", +# "win_slide_duration": "30s", +# "time_stamp_col": "time_stamp" +# } +# ) +# streamer_thread.start() +# try: +# while True: +# time.sleep(10) +# except KeyboardInterrupt: +# LOGGER.info("KeyboardInterrupt received. Stopping streamer...") +# stop_event.set() +# streamer_thread.join() +# LOGGER.info("Streamer stopped gracefully.") # --- To test Start Streamer functionality --- -def test_StartSparkStreamer(): - LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") - analyzer_obj = create_analyzer() - analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid - analyzer_to_generate : Dict = { - "algo_name" : analyzer_obj.algorithm_name, - "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], - "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], - "oper_mode" : analyzer_obj.operation_mode, - "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), - "window_size" : analyzer_obj.parameters["window_size"], - "window_slider" : analyzer_obj.parameters["window_slider"], - # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] - } - AnalyticsBackendServiceObj = AnalyticsBackendService() - response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate) - assert isinstance(response, bool) +# def test_StartDaskStreamer(): +# LOGGER.debug(" >>> test_StartBaskStreamer: START <<< ") +# analyzer_obj = create_analyzer_dask() +# # LOGGER.info("Created Analyzer Object: {:}".format(analyzer_obj)) +# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid +# analyzer_to_generate : Dict = { +# "algo_name" : analyzer_obj.algorithm_name, +# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], +# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], +# "oper_mode" : analyzer_obj.operation_mode, +# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), +# "oper_list" : json.loads(analyzer_obj.parameters["oper_list"]), +# # "oper_list" : analyzer_obj.parameters["oper_list"], +# "window_size" : analyzer_obj.parameters["window_size"], +# "window_slider" : analyzer_obj.parameters["window_slider"], +# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] +# } +# AnalyticsBackendServiceObj = AnalyticsBackendService() +# LOGGER.info("Analyzer to be generated: {:}".format((analyzer_to_generate))) +# response = AnalyticsBackendServiceObj.StartDaskListener(analyzer_uuid, analyzer_to_generate) +# assert isinstance(response, bool) +# time.sleep(100) +# LOGGER.info('Initiating StopRequestListener...') +# # AnalyticsBackendServiceObj = AnalyticsBackendService() +# response = AnalyticsBackendServiceObj.StopDaskListener(analyzer_uuid) +# LOGGER.debug(str(response)) +# assert isinstance(response, bool) -# --- To TEST StartRequestListenerFunctionality +# --- To test Start Streamer functionality --- +# def test_StartSparkStreamer(): +# LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") +# analyzer_obj = create_analyzer() +# analyzer_uuid = analyzer_obj.analyzer_id.analyzer_id.uuid +# analyzer_to_generate : Dict = { +# "algo_name" : analyzer_obj.algorithm_name, +# "input_kpis" : [k.kpi_id.uuid for k in analyzer_obj.input_kpi_ids], +# "output_kpis" : [k.kpi_id.uuid for k in analyzer_obj.output_kpi_ids], +# "oper_mode" : analyzer_obj.operation_mode, +# "thresholds" : json.loads(analyzer_obj.parameters["thresholds"]), +# "window_size" : analyzer_obj.parameters["window_size"], +# "window_slider" : analyzer_obj.parameters["window_slider"], +# # "store_aggregate" : analyzer_obj.parameters["store_aggregate"] +# } +# AnalyticsBackendServiceObj = AnalyticsBackendService() +# response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate) +# assert isinstance(response, bool) + +# --- To TEST StartRequestListenerFunctionality # def test_StartRequestListener(): # LOGGER.info('test_RunRequestListener') # AnalyticsBackendServiceObj = AnalyticsBackendService() -# threading.Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()).start() +# AnalyticsBackendServiceObj.stop_event = Event() +# listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()) +# listener_thread.start() + +# time.sleep(100) + +# # AnalyticsBackendServiceObj.stop_event.set() +# # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds') +# # listener_thread.join(timeout=10) +# # assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected." +# LOGGER.info('Completed test_RunRequestListener') # To test START and STOP communication together # def test_StopRequestListener(): diff --git a/src/analytics/frontend/tests/messages.py b/src/analytics/frontend/tests/messages.py index eb25c33b0..e2d39585e 100644 --- a/src/analytics/frontend/tests/messages.py +++ b/src/analytics/frontend/tests/messages.py @@ -49,11 +49,12 @@ def create_analyzer(): _create_analyzer.output_kpi_ids.append(_kpi_id) # parameter _threshold_dict = { - # 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), - 'first_value' :(00, 10), 'last_value' :(40, 50), 'stdev_value':(00, 10)} + 'mean_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50), + 'first_value' :(00, 10), 'last_value' :(40, 50), 'std_value':(00, 10) + } _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) - _create_analyzer.parameters['window_size'] = "60 seconds" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" - _create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size + _create_analyzer.parameters['window_size'] = "10s" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" + _create_analyzer.parameters['window_slider'] = "5s" # should be less than window size _create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet return _create_analyzer -- GitLab From 10d86c7670dc9022f59fcf2b15279df237069487 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Wed, 25 Sep 2024 04:47:50 +0000 Subject: [PATCH 07/12] changes in Analytics backend docker and requirement file. --- src/analytics/backend/Dockerfile | 9 -------- src/analytics/backend/requirements.in | 4 ++-- src/analytics/backend/tests/messages.py | 14 ++++++------ src/analytics/backend/tests/test_backend.py | 24 ++++++++++----------- 4 files changed, 21 insertions(+), 30 deletions(-) diff --git a/src/analytics/backend/Dockerfile b/src/analytics/backend/Dockerfile index ef49657cd..17adcd3ab 100644 --- a/src/analytics/backend/Dockerfile +++ b/src/analytics/backend/Dockerfile @@ -53,15 +53,6 @@ RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto RUN rm *.proto RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; -# Install Java (required for PySpark) -RUN apt-get update && \ - apt-get install -y default-jdk && \ - apt-get clean - -# Set JAVA_HOME environment variable -ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 -ENV PATH=$JAVA_HOME/bin:$PATH - # Create component sub-folders, get specific Python packages RUN mkdir -p /var/teraflow/analytics/backend WORKDIR /var/teraflow/analytics/backend diff --git a/src/analytics/backend/requirements.in b/src/analytics/backend/requirements.in index 04ab95c2d..360d94f46 100644 --- a/src/analytics/backend/requirements.in +++ b/src/analytics/backend/requirements.in @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -dask==2024.9.0 -distributed==2024.9.0 +dask==2024.1.0 +distributed==2024.1.0 pandas==2.2.3 confluent-kafka==2.3.* diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py index a1d4d0629..cdc6c3442 100644 --- a/src/analytics/backend/tests/messages.py +++ b/src/analytics/backend/tests/messages.py @@ -81,20 +81,20 @@ def create_analyzer(): def create_analyzer_dask(): _create_analyzer = Analyzer() - # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) - _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + # _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING _kpi_id = KpiId() # input IDs to analyze - _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" _create_analyzer.input_kpi_ids.append(_kpi_id) - _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.input_kpi_ids.append(_kpi_id) - _kpi_id.kpi_id.uuid = str(uuid.uuid4()) + # _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.input_kpi_ids.append(_kpi_id) # output IDs after analysis _kpi_id.kpi_id.uuid = str(uuid.uuid4()) @@ -104,8 +104,8 @@ def create_analyzer_dask(): # parameter _threshold_dict = { - 'mean_PKS_TX' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50),#} - 'first_value' :(00, 50), 'last_value' :(50, 100), 'std_value' :(0, 90)} + 'mean_latency' :(20, 30), 'min_latency' :(00, 10), 'max_latency' :(45, 50),#} + 'first_value' :(00, 50), 'last_value' :(50, 100), 'std_value' :(0, 90)} _create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) _create_analyzer.parameters['oper_list'] = json.dumps([key.split('_')[0] for key in _threshold_dict.keys()]) _create_analyzer.parameters['window_size'] = "10s" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks" diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index 48ce86747..470729160 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -119,20 +119,20 @@ LOGGER = logging.getLogger(__name__) # assert isinstance(response, bool) # --- To TEST StartRequestListenerFunctionality -# def test_StartRequestListener(): -# LOGGER.info('test_RunRequestListener') -# AnalyticsBackendServiceObj = AnalyticsBackendService() -# AnalyticsBackendServiceObj.stop_event = Event() -# listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()) -# listener_thread.start() +def test_StartRequestListener(): + LOGGER.info('test_RunRequestListener') + AnalyticsBackendServiceObj = AnalyticsBackendService() + AnalyticsBackendServiceObj.stop_event = Event() + listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()) + listener_thread.start() -# time.sleep(100) + time.sleep(100) -# # AnalyticsBackendServiceObj.stop_event.set() -# # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds') -# # listener_thread.join(timeout=10) -# # assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected." -# LOGGER.info('Completed test_RunRequestListener') + # AnalyticsBackendServiceObj.stop_event.set() + # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds') + # listener_thread.join(timeout=10) + # assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected." + LOGGER.info('Completed test_RunRequestListener') # To test START and STOP communication together # def test_StopRequestListener(): -- GitLab From 5f72599e1ad643698ff8c763a46eb57cfed064f5 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Wed, 25 Sep 2024 06:50:15 +0000 Subject: [PATCH 08/12] Changes in Analytics bacnkend and KPI value API - Analytics backend Request listener will terminated incase topic doesn't exists. - The GetAlrams() in KpiValueApi is updated to process the new resposne from daskstreamer. --- .../run_tests_locally-analytics-backend.sh | 1 + .../run_tests_locally-analytics-frontend.sh | 1 + scripts/run_tests_locally-kpi-value-API.sh | 5 +- .../service/AnalyticsBackendService.py | 2 +- .../service/KpiValueApiServiceServicerImpl.py | 47 +++++++++---------- 5 files changed, 29 insertions(+), 27 deletions(-) diff --git a/scripts/run_tests_locally-analytics-backend.sh b/scripts/run_tests_locally-analytics-backend.sh index 99a789c9f..722779824 100755 --- a/scripts/run_tests_locally-analytics-backend.sh +++ b/scripts/run_tests_locally-analytics-backend.sh @@ -18,6 +18,7 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc +export KFK_SERVER_ADDRESS='127.0.0.1:9092' CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ diff --git a/scripts/run_tests_locally-analytics-frontend.sh b/scripts/run_tests_locally-analytics-frontend.sh index e30d30da6..e74eb4ec1 100755 --- a/scripts/run_tests_locally-analytics-frontend.sh +++ b/scripts/run_tests_locally-analytics-frontend.sh @@ -18,6 +18,7 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc +export KFK_SERVER_ADDRESS='127.0.0.1:9092' CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ diff --git a/scripts/run_tests_locally-kpi-value-API.sh b/scripts/run_tests_locally-kpi-value-API.sh index 3953d2a89..96ac558ba 100755 --- a/scripts/run_tests_locally-kpi-value-API.sh +++ b/scripts/run_tests_locally-kpi-value-API.sh @@ -19,8 +19,9 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc -KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}") -KFK_SERVER_ADDRESS=${KAFKA_IP}:9092 +# KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}") +# export KFK_SERVER_ADDRESS=${KAFKA_IP}:9092 +export KFK_SERVER_ADDRESS='127.0.0.1:9092' # helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0 python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG -o log_cli=true --verbose \ kpi_value_api/tests/test_kpi_value_api.py diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 367d91d97..f9c3b86a2 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -61,7 +61,7 @@ class AnalyticsBackendService(GenericGrpcService): else: LOGGER.error("Consumer error: {:}".format(receive_msg.error())) print ("Consumer error: {:}".format(receive_msg.error())) - continue + break try: analyzer = json.loads(receive_msg.value().decode('utf-8')) analyzer_uuid = receive_msg.key().decode('utf-8') diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index 0e2d49300..706e180d5 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -46,7 +46,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): self.scheduler = BackgroundScheduler() self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : 'analytics-frontend', + 'group.id' : 'kpi-value-api-frontend', 'auto.offset.reset' : 'latest'}) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @@ -152,9 +152,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): for alarm_key, value in self.StartResponseListener(request.kpi_id.uuid): response.start_timestamp.timestamp = datetime.strptime( - value["window"]["start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() - response.end_timestamp.timestamp = datetime.strptime( - value["window"]["end"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() + value["window_start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() response.kpi_id.kpi_id.uuid = value['kpi_id'] for key, threshold in value.items(): if key not in ['kpi_id', 'window']: @@ -183,10 +181,10 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): key, value = self.result_queue.get() # Wait until a result is available LOGGER.info("In while true ...") yield key, value # Yield the result to the calling function - except KeyboardInterrupt: - LOGGER.warning("Listener stopped manually.") + except Exception as e: + LOGGER.warning("Listener stopped. Error: {:}".format(e)) finally: - self.StopListener() + self.scheduler.shutdown() def response_listener(self, filter_key=None): """ @@ -196,23 +194,24 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): consumer = self.kafka_consumer consumer.subscribe([self.listener_topic]) - msg = consumer.poll(2.0) - if msg is None: - return - elif msg.error(): - if msg.error().code() != KafkaError._PARTITION_EOF: - LOGGER.error(f"Kafka error: {msg.error()}") - return - try: - key = msg.key().decode('utf-8') if msg.key() else None - if filter_key is not None and key == filter_key: - value = json.loads(msg.value().decode('utf-8')) - LOGGER.info(f"Received key: {key}, value: {value}") - self.result_queue.put((key, value)) - else: - LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}") - except Exception as e: - LOGGER.error(f"Error processing Kafka message: {e}") + while True: + msg = consumer.poll(1.0) + if msg is None: + continue + elif msg.error(): + if msg.error().code() != KafkaError._PARTITION_EOF: + LOGGER.error(f"Kafka error: {msg.error()}") + break + try: + key = msg.key().decode('utf-8') if msg.key() else None + if filter_key is not None and key == filter_key: + value = json.loads(msg.value().decode('utf-8')) + LOGGER.info(f"Received key: {key}, value: {value}") + self.result_queue.put((key, value)) + else: + LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}") + except Exception as e: + LOGGER.error(f"Error processing Kafka message: {e}") def delivery_callback(self, err, msg): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) -- GitLab From ff9445f31228e1d03dc4723bc8cdcf84d7765149 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Mon, 7 Oct 2024 07:12:45 +0000 Subject: [PATCH 09/12] Changes to refine code for monitoring E2E test. --- scripts/run_tests_locally-telemetry-backend.sh | 1 + scripts/run_tests_locally-telemetry-frontend.sh | 1 + src/analytics/backend/tests/test_backend.py | 16 ++++++++-------- .../AnalyticsFrontendServiceServicerImpl.py | 4 +++- src/analytics/frontend/tests/test_frontend.py | 8 ++++---- src/analytics/tests/test_analytics_db.py | 3 ++- src/telemetry/backend/tests/test_backend.py | 8 ++++---- src/telemetry/tests/test_telemetryDB.py | 3 ++- 8 files changed, 25 insertions(+), 19 deletions(-) diff --git a/scripts/run_tests_locally-telemetry-backend.sh b/scripts/run_tests_locally-telemetry-backend.sh index 0afeccb30..745d77c62 100755 --- a/scripts/run_tests_locally-telemetry-backend.sh +++ b/scripts/run_tests_locally-telemetry-backend.sh @@ -22,6 +22,7 @@ cd $PROJECTDIR/src # kpi_manager/tests/test_unitary.py # python3 kpi_manager/tests/test_unitary.py +export KFK_SERVER_ADDRESS='127.0.0.1:9092' CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" RCFILE=$PROJECTDIR/coverage/.coveragerc diff --git a/scripts/run_tests_locally-telemetry-frontend.sh b/scripts/run_tests_locally-telemetry-frontend.sh index 8e0989eca..a6447cb4c 100755 --- a/scripts/run_tests_locally-telemetry-frontend.sh +++ b/scripts/run_tests_locally-telemetry-frontend.sh @@ -19,6 +19,7 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src # python3 kpi_manager/tests/test_unitary.py +export KFK_SERVER_ADDRESS='127.0.0.1:9092' CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" RCFILE=$PROJECTDIR/coverage/.coveragerc diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index 470729160..79d760f8e 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -119,20 +119,20 @@ LOGGER = logging.getLogger(__name__) # assert isinstance(response, bool) # --- To TEST StartRequestListenerFunctionality -def test_StartRequestListener(): - LOGGER.info('test_RunRequestListener') - AnalyticsBackendServiceObj = AnalyticsBackendService() - AnalyticsBackendServiceObj.stop_event = Event() - listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()) - listener_thread.start() +# def test_StartRequestListener(): +# LOGGER.info('test_RunRequestListener') +# AnalyticsBackendServiceObj = AnalyticsBackendService() +# AnalyticsBackendServiceObj.stop_event = Event() +# listener_thread = Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()) +# listener_thread.start() - time.sleep(100) +# time.sleep(100) # AnalyticsBackendServiceObj.stop_event.set() # LOGGER.info('Backend termination initiated. waiting for termination... 10 seconds') # listener_thread.join(timeout=10) # assert not listener_thread.is_alive(), "RequestListener thread did not terminate as expected." - LOGGER.info('Completed test_RunRequestListener') + # LOGGER.info('Completed test_RunRequestListener') # To test START and STOP communication together # def test_StopRequestListener(): diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index 9ffacecc3..e304d7acb 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, grpc, json +import logging, grpc, json, queue from typing import Dict from confluent_kafka import Producer as KafkaProducer @@ -24,6 +24,8 @@ from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, Analy from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer from analytics.database.Analyzer_DB import AnalyzerDB from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.interval import IntervalTrigger LOGGER = logging.getLogger(__name__) diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py index 6a126905c..3898ec65e 100644 --- a/src/analytics/frontend/tests/test_frontend.py +++ b/src/analytics/frontend/tests/test_frontend.py @@ -84,10 +84,10 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic ########################### # --- "test_validate_kafka_topics" should be executed before the functionality tests --- -# def test_validate_kafka_topics(): -# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") -# response = KafkaTopic.create_all_topics() -# assert isinstance(response, bool) +def test_validate_kafka_topics(): + LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") + response = KafkaTopic.create_all_topics() + assert isinstance(response, bool) # ----- core funtionality test ----- # def test_StartAnalytics(analyticsFrontend_client): diff --git a/src/analytics/tests/test_analytics_db.py b/src/analytics/tests/test_analytics_db.py index 58e7d0167..2794edb4a 100644 --- a/src/analytics/tests/test_analytics_db.py +++ b/src/analytics/tests/test_analytics_db.py @@ -15,12 +15,13 @@ import logging from analytics.database.Analyzer_DB import AnalyzerDB +from analytics.database.AnalyzerModel import Analyzer LOGGER = logging.getLogger(__name__) def test_verify_databases_and_tables(): LOGGER.info('>>> test_verify_databases_and_tables : START <<< ') - AnalyzerDBobj = AnalyzerDB() + AnalyzerDBobj = AnalyzerDB(Analyzer) # AnalyzerDBobj.drop_database() # AnalyzerDBobj.verify_tables() AnalyzerDBobj.create_database() diff --git a/src/telemetry/backend/tests/test_backend.py b/src/telemetry/backend/tests/test_backend.py index 4764d7f5f..331447716 100644 --- a/src/telemetry/backend/tests/test_backend.py +++ b/src/telemetry/backend/tests/test_backend.py @@ -26,10 +26,10 @@ LOGGER = logging.getLogger(__name__) ########################### # --- "test_validate_kafka_topics" should be run before the functionality tests --- -# def test_validate_kafka_topics(): -# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") -# response = KafkaTopic.create_all_topics() -# assert isinstance(response, bool) +def test_validate_kafka_topics(): + LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") + response = KafkaTopic.create_all_topics() + assert isinstance(response, bool) def test_RunRequestListener(): LOGGER.info('test_RunRequestListener') diff --git a/src/telemetry/tests/test_telemetryDB.py b/src/telemetry/tests/test_telemetryDB.py index 1b122e4bc..bbc02a2a2 100644 --- a/src/telemetry/tests/test_telemetryDB.py +++ b/src/telemetry/tests/test_telemetryDB.py @@ -15,12 +15,13 @@ import logging from telemetry.database.Telemetry_DB import TelemetryDB +from telemetry.database.TelemetryModel import Collector as CollectorModel LOGGER = logging.getLogger(__name__) def test_verify_databases_and_tables(): LOGGER.info('>>> test_verify_databases_and_tables : START <<< ') - TelemetryDBobj = TelemetryDB() + TelemetryDBobj = TelemetryDB(CollectorModel) # TelemetryDBobj.drop_database() # TelemetryDBobj.verify_tables() TelemetryDBobj.create_database() -- GitLab From e44a93bae96165d4a3115f00584e8c26ab15f85a Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Mon, 7 Oct 2024 07:32:11 +0000 Subject: [PATCH 10/12] Verfies all logger statements for E2E test --- .../service/AnalyticsBackendService.py | 20 ++++++++-------- src/analytics/backend/tests/test_backend.py | 8 +++---- .../AnalyticsFrontendServiceServicerImpl.py | 13 ++++------- src/analytics/frontend/tests/test_frontend.py | 2 +- .../service/TelemetryBackendService.py | 23 ++++++++++--------- src/telemetry/backend/tests/test_backend.py | 8 +++---- .../TelemetryFrontendServiceServicerImpl.py | 17 +++++++++----- 7 files changed, 47 insertions(+), 44 deletions(-) diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index f9c3b86a2..a8c790e78 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -48,7 +48,7 @@ class AnalyticsBackendService(GenericGrpcService): listener for requests on Kafka topic. """ LOGGER.info("Request Listener is initiated ...") - print ("Request Listener is initiated ...") + # print ("Request Listener is initiated ...") consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value]) while True: @@ -60,13 +60,13 @@ class AnalyticsBackendService(GenericGrpcService): continue else: LOGGER.error("Consumer error: {:}".format(receive_msg.error())) - print ("Consumer error: {:}".format(receive_msg.error())) + # print ("Consumer error: {:}".format(receive_msg.error())) break try: analyzer = json.loads(receive_msg.value().decode('utf-8')) analyzer_uuid = receive_msg.key().decode('utf-8') LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) - print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) + # print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: self.StopDaskListener(analyzer_uuid) @@ -74,7 +74,7 @@ class AnalyticsBackendService(GenericGrpcService): self.StartDaskListener(analyzer_uuid, analyzer) except Exception as e: LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) - print ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) + # print ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) def StartDaskListener(self, analyzer_uuid, analyzer): kpi_list = analyzer[ 'input_kpis' ] @@ -84,8 +84,8 @@ class AnalyticsBackendService(GenericGrpcService): LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:}".format( kpi_list, thresholds, window_size, window_slider)) - print ("Received parameters: {:} - {:} - {:} - {:}".format( - kpi_list, thresholds, window_size, window_slider)) + # print ("Received parameters: {:} - {:} - {:} - {:}".format( + # kpi_list, thresholds, window_size, window_slider)) try: stop_event = Event() thread = Thread( @@ -98,11 +98,11 @@ class AnalyticsBackendService(GenericGrpcService): ) thread.start() self.running_threads[analyzer_uuid] = (thread, stop_event) - print ("Initiated Analyzer backend: {:}".format(analyzer_uuid)) + # print ("Initiated Analyzer backend: {:}".format(analyzer_uuid)) LOGGER.info("Initiated Analyzer backend: {:}".format(analyzer_uuid)) return True except Exception as e: - print ("Failed to initiate Analyzer backend: {:}".format(e)) + # print ("Failed to initiate Analyzer backend: {:}".format(e)) LOGGER.error("Failed to initiate Analyzer backend: {:}".format(e)) return False @@ -113,12 +113,12 @@ class AnalyticsBackendService(GenericGrpcService): stop_event.set() thread.join() del self.running_threads[analyzer_uuid] - print ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) + # print ("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) LOGGER.info("Terminating backend (by TerminateBackend): Analyzer Id: {:}".format(analyzer_uuid)) return True except Exception as e: LOGGER.error("Failed to terminate. Analyzer Id: {:} - ERROR: {:}".format(analyzer_uuid, e)) return False else: - print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid)) + # print ("Analyzer not found in active collectors. Analyzer Id: {:}".format(analyzer_uuid)) LOGGER.warning("Analyzer not found in active collectors: Analyzer Id: {:}".format(analyzer_uuid)) diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index 79d760f8e..01d71df84 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -31,10 +31,10 @@ LOGGER = logging.getLogger(__name__) ########################### # --- "test_validate_kafka_topics" should be run before the functionality tests --- -# def test_validate_kafka_topics(): -# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") -# response = KafkaTopic.create_all_topics() -# assert isinstance(response, bool) +def test_validate_kafka_topics(): + LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") + response = KafkaTopic.create_all_topics() + assert isinstance(response, bool) # --- To test Dask Streamer functionality --- diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index e304d7acb..323113bb0 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -16,6 +16,7 @@ import logging, grpc, json, queue from typing import Dict from confluent_kafka import Producer as KafkaProducer +from confluent_kafka import KafkaError from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.proto.context_pb2 import Empty @@ -96,10 +97,8 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...") try: while True: - LOGGER.info("entering while...") - key, value = self.result_queue.get() # Wait until a result is available - LOGGER.info("In while true ...") - yield key, value # Yield the result to the calling function + key, value = self.result_queue.get() + yield key, value except KeyboardInterrupt: LOGGER.warning("Listener stopped manually.") finally: @@ -129,8 +128,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): self.result_queue.put((key, value)) else: LOGGER.info(f"Skipping message with unmatched key: {key}") - # value = json.loads(msg.value().decode('utf-8')) # Added for debugging - # self.result_queue.put((filter_key, value)) # Added for debugging except Exception as e: LOGGER.error(f"Error processing Kafka message: {e}") @@ -191,7 +188,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): def delivery_callback(self, err, msg): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) - print ('Message delivery failed: {:}'.format(err)) + # print ('Message delivery failed: {:}'.format(err)) else: LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) - print('Message delivered to topic {:}'.format(msg.topic())) + # print('Message delivered to topic {:}'.format(msg.topic())) diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py index 3898ec65e..526c32eb8 100644 --- a/src/analytics/frontend/tests/test_frontend.py +++ b/src/analytics/frontend/tests/test_frontend.py @@ -97,7 +97,7 @@ def test_validate_kafka_topics(): # assert isinstance(response, AnalyzerId) # To test start and stop listener together -def test_StartStopAnalyzers(analyticsFrontend_client): +def test_StartAnalyzers(analyticsFrontend_client): LOGGER.info(' >>> test_StartAnalyzers START: <<< ') added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer()) LOGGER.debug(str(added_analyzer_id)) diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index f3cf18d65..79a35d343 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -55,7 +55,7 @@ class TelemetryBackendService(GenericGrpcService): listener for requests on Kafka topic. """ LOGGER.info('Telemetry backend request listener is running ...') - print ('Telemetry backend request listener is running ...') + # print ('Telemetry backend request listener is running ...') consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.REQUEST.value]) while True: @@ -66,13 +66,13 @@ class TelemetryBackendService(GenericGrpcService): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue else: - print("Consumer error: {}".format(receive_msg.error())) + # print("Consumer error: {}".format(receive_msg.error())) break try: collector = json.loads(receive_msg.value().decode('utf-8')) collector_id = receive_msg.key().decode('utf-8') LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) - print('Recevied Collector: {:} - {:}'.format(collector_id, collector)) + # print('Recevied Collector: {:} - {:}'.format(collector_id, collector)) if collector['duration'] == -1 and collector['interval'] == -1: self.TerminateCollectorBackend(collector_id) @@ -80,18 +80,19 @@ class TelemetryBackendService(GenericGrpcService): self.RunInitiateCollectorBackend(collector_id, collector) except Exception as e: LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e)) - print ("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e)) + # print ("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e)) def TerminateCollectorBackend(self, collector_id): if collector_id in self.running_threads: thread, stop_event = self.running_threads[collector_id] stop_event.set() thread.join() - print ("Terminating backend (by StopCollector): Collector Id: ", collector_id) + # print ("Terminating backend (by StopCollector): Collector Id: ", collector_id) del self.running_threads[collector_id] self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. else: - print ('Backend collector {:} not found'.format(collector_id)) + # print ('Backend collector {:} not found'.format(collector_id)) + LOGGER.warning('Backend collector {:} not found'.format(collector_id)) def RunInitiateCollectorBackend(self, collector_id: str, collector: str): stop_event = threading.Event() @@ -104,7 +105,8 @@ class TelemetryBackendService(GenericGrpcService): """ Method receives collector request and initiates collecter backend. """ - print("Initiating backend for collector: ", collector_id) + # print("Initiating backend for collector: ", collector_id) + LOGGER.info("Initiating backend for collector: ", collector_id) start_time = time.time() while not stop_event.is_set(): if int(collector['duration']) != -1 and time.time() - start_time >= collector['duration']: # condition to terminate backend @@ -136,8 +138,7 @@ class TelemetryBackendService(GenericGrpcService): Method to extract kpi value. """ measured_kpi_value = random.randint(1,100) # TODO: To be extracted from a device - print ("Measured Kpi value: {:}".format(measured_kpi_value)) - # measured_kpi_value = self.fetch_node_exporter_metrics() # exporter extracted metric value against default KPI + # print ("Measured Kpi value: {:}".format(measured_kpi_value)) self.GenerateCollectorResponse(collector_id, kpi_id , measured_kpi_value) def GenerateCollectorResponse(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): @@ -166,7 +167,7 @@ class TelemetryBackendService(GenericGrpcService): """ if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) - print(f'Message delivery failed: {err}') + # print(f'Message delivery failed: {err}') else: LOGGER.info('Message delivered to topic {:}'.format(msg.topic())) - print(f'Message delivered to topic {msg.topic()}') + # print(f'Message delivered to topic {msg.topic()}') diff --git a/src/telemetry/backend/tests/test_backend.py b/src/telemetry/backend/tests/test_backend.py index 331447716..8bbde9769 100644 --- a/src/telemetry/backend/tests/test_backend.py +++ b/src/telemetry/backend/tests/test_backend.py @@ -31,7 +31,7 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) -def test_RunRequestListener(): - LOGGER.info('test_RunRequestListener') - TelemetryBackendServiceObj = TelemetryBackendService() - threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start() \ No newline at end of file +# def test_RunRequestListener(): +# LOGGER.info('test_RunRequestListener') +# TelemetryBackendServiceObj = TelemetryBackendService() +# threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start() \ No newline at end of file diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index ad99dff12..5c569e2dd 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -153,7 +153,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): """ if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) - print('Message delivery failed: {:}'.format(err)) + # print('Message delivery failed: {:}'.format(err)) # else: # LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) # print('Message delivered to topic {:}'.format(msg.topic())) @@ -177,7 +177,8 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue else: - print("Consumer error: {}".format(receive_msg.error())) + # print("Consumer error: {:}".format(receive_msg.error())) + LOGGER.error("Consumer error: {:}".format(receive_msg.error())) break try: collector_id = receive_msg.key().decode('utf-8') @@ -185,13 +186,17 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): kpi_value = json.loads(receive_msg.value().decode('utf-8')) self.process_response(collector_id, kpi_value['kpi_id'], kpi_value['kpi_value']) else: - print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ") + # print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ") + LOGGER.info("collector id does not match.\nRespone ID: {:} --- Active IDs: {:}".format(collector_id, ACTIVE_COLLECTORS)) except Exception as e: - print(f"Error extarcting msg key or value: {str(e)}") + # print(f"Error extarcting msg key or value: {str(e)}") + LOGGER.info("Error extarcting msg key or value: {:}".format(e)) continue def process_response(self, collector_id: str, kpi_id: str, kpi_value: Any): if kpi_id == "-1" and kpi_value == -1: - print ("Backend termination confirmation for collector id: ", collector_id) + # print ("Backend termination confirmation for collector id: ", collector_id) + LOGGER.info("Backend termination confirmation for collector id: ", collector_id) else: - print ("KPI Value: Collector Id:", collector_id, ", Kpi Id:", kpi_id, ", Value:", kpi_value) + LOGGER.info("Backend termination confirmation for collector id: ", collector_id) + # print ("KPI Value: Collector Id:", collector_id, ", Kpi Id:", kpi_id, ", Value:", kpi_value) -- GitLab From 7a505ea280765ec3c8c7c27636f354a5900d9dc5 Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Mon, 7 Oct 2024 15:06:29 +0000 Subject: [PATCH 11/12] Pre-merge code cleanup --- proto/kpi_value_api.proto | 4 ++-- src/telemetry/requirements.in | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/proto/kpi_value_api.proto b/proto/kpi_value_api.proto index 4d3a1f216..4bd3dd50f 100644 --- a/proto/kpi_value_api.proto +++ b/proto/kpi_value_api.proto @@ -19,8 +19,8 @@ import "context.proto"; import "kpi_manager.proto"; service KpiValueAPIService { - rpc StoreKpiValues (KpiValueList) returns (context.Empty) {} - rpc SelectKpiValues (KpiValueFilter) returns (KpiValueList) {} + rpc StoreKpiValues (KpiValueList ) returns (context.Empty ) {} + rpc SelectKpiValues (KpiValueFilter ) returns (KpiValueList ) {} rpc GetKpiAlarms (kpi_manager.KpiId) returns (stream KpiAlarms) {} } diff --git a/src/telemetry/requirements.in b/src/telemetry/requirements.in index 4e9981afb..503468a66 100644 --- a/src/telemetry/requirements.in +++ b/src/telemetry/requirements.in @@ -17,4 +17,4 @@ psycopg2-binary==2.9.3 python-dateutil==2.8.2 python-json-logger==2.0.2 pytz==2024.1 -requests==2.27.1 \ No newline at end of file +requests==2.27.1 -- GitLab From 478d1a8ed29b8bb85d4e194d1becca389f7af5b5 Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Mon, 7 Oct 2024 15:07:57 +0000 Subject: [PATCH 12/12] Pre-merge code cleanup --- proto/kpi_value_api.proto | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/proto/kpi_value_api.proto b/proto/kpi_value_api.proto index 4bd3dd50f..a97b0ae2b 100644 --- a/proto/kpi_value_api.proto +++ b/proto/kpi_value_api.proto @@ -19,19 +19,19 @@ import "context.proto"; import "kpi_manager.proto"; service KpiValueAPIService { - rpc StoreKpiValues (KpiValueList ) returns (context.Empty ) {} - rpc SelectKpiValues (KpiValueFilter ) returns (KpiValueList ) {} + rpc StoreKpiValues (KpiValueList ) returns (context.Empty ) {} + rpc SelectKpiValues (KpiValueFilter ) returns (KpiValueList ) {} rpc GetKpiAlarms (kpi_manager.KpiId) returns (stream KpiAlarms) {} } message KpiValue { - kpi_manager.KpiId kpi_id = 1; - context.Timestamp timestamp = 2; - KpiValueType kpi_value_type = 3; + kpi_manager.KpiId kpi_id = 1; + context.Timestamp timestamp = 2; + KpiValueType kpi_value_type = 3; } message KpiValueList { - repeated KpiValue kpi_value_list = 1; + repeated KpiValue kpi_value_list = 1; } message KpiValueType { @@ -47,9 +47,9 @@ message KpiValueType { } message KpiValueFilter { - repeated kpi_manager.KpiId kpi_id = 1; - repeated context.Timestamp start_timestamp = 2; - repeated context.Timestamp end_timestamp = 3; + repeated kpi_manager.KpiId kpi_id = 1; + repeated context.Timestamp start_timestamp = 2; + repeated context.Timestamp end_timestamp = 3; } message KpiAlarms { -- GitLab