diff --git a/my_deploy.sh b/my_deploy.sh index 88be82b63e9e79a97ee79702de886f69a6152f94..5b535a9593f8f6fabfbe4c5b697ef240276c7e65 100755 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -26,7 +26,7 @@ export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_gene #export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring" # Uncomment to activate Monitoring Framework (new) -#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api" +#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api telemetry analytics" # Uncomment to activate BGP-LS Speaker #export TFS_COMPONENTS="${TFS_COMPONENTS} bgpls_speaker" diff --git a/scripts/run_tests_locally-kpi-DB.sh b/scripts/run_tests_locally-kpi-DB.sh index 4953b49e0a437becfda1648c722bcdcf92c58d93..ad1b4c57b6632266d539db07637fb1c0b024cf36 100755 --- a/scripts/run_tests_locally-kpi-DB.sh +++ b/scripts/run_tests_locally-kpi-DB.sh @@ -24,7 +24,7 @@ cd $PROJECTDIR/src # python3 kpi_manager/tests/test_unitary.py RCFILE=$PROJECTDIR/coverage/.coveragerc -CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}') +CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace ${CRDB_NAMESPACE} -o 'jsonpath={.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ kpi_manager/tests/test_kpi_db.py diff --git a/scripts/run_tests_locally-telemetry-DB.sh b/scripts/run_tests_locally-telemetry-DB.sh index 4b9a417603cc42a4e7e8b19c7394cc38633817fa..529774f2da703347abafdc168b932ef4ac29bed1 100755 --- a/scripts/run_tests_locally-telemetry-DB.sh +++ b/scripts/run_tests_locally-telemetry-DB.sh @@ -20,7 +20,8 @@ cd $PROJECTDIR/src # RCFILE=$PROJECTDIR/coverage/.coveragerc # coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ # kpi_manager/tests/test_unitary.py - +CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}') +export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" RCFILE=$PROJECTDIR/coverage/.coveragerc python3 -m pytest --log-level=DEBUG --log-cli-level=debug --verbose \ telemetry/tests/test_telemetryDB.py diff --git a/scripts/run_tests_locally-telemetry-backend.sh b/scripts/run_tests_locally-telemetry-backend.sh index 422574d0e8b926bc734c83a7bee1c5370f1c581a..4867335a53ee17c1fa279b2ee6bcf2bbac6bd1ba 100755 --- a/scripts/run_tests_locally-telemetry-backend.sh +++ b/scripts/run_tests_locally-telemetry-backend.sh @@ -26,4 +26,4 @@ CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o js export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" RCFILE=$PROJECTDIR/coverage/.coveragerc python3 -m pytest --log-level=INFO --log-cli-level=debug --verbose \ - telemetry/backend/tests/test_TelemetryBackend.py + telemetry/backend/tests/test_backend.py diff --git a/src/analytics/backend/service/AnalyticsBackendService.py b/src/analytics/backend/service/AnalyticsBackendService.py index 658d237956b4ed3addbbc295ef0d19dd4b977257..96b1f52428a2e612f7869ca90e878f4774cf6b8a 100755 --- a/src/analytics/backend/service/AnalyticsBackendService.py +++ b/src/analytics/backend/service/AnalyticsBackendService.py @@ -46,10 +46,10 @@ class AnalyticsBackendService(GenericGrpcService): thresholds = analyzer['thresholds'] window_size = analyzer['window_size'] window_slider = analyzer['window_slider'] - print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format( - kpi_list, oper_list, thresholds, window_size, window_slider)) - LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format( - kpi_list, oper_list, thresholds, window_size, window_slider)) + # print ("Received parameters: {:} - {:} - {:} - {:} - {:}".format( + # kpi_list, oper_list, thresholds, window_size, window_slider)) + # LOGGER.debug ("Received parameters: {:} - {:} - {:} - {:} - {:}".format( + # kpi_list, oper_list, thresholds, window_size, window_slider)) try: stop_event = threading.Event() thread = threading.Thread(target=SparkStreamer, @@ -86,6 +86,7 @@ class AnalyticsBackendService(GenericGrpcService): listener for requests on Kafka topic. """ LOGGER.info("Request Listener is initiated ...") + print ("Request Listener is initiated ...") consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.ANALYTICS_REQUEST.value]) while True: @@ -98,17 +99,19 @@ class AnalyticsBackendService(GenericGrpcService): else: print("Consumer error: {}".format(receive_msg.error())) break - analyzer = json.loads(receive_msg.value().decode('utf-8')) - analyzer_uuid = receive_msg.key().decode('utf-8') - LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) - print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) + try: + analyzer = json.loads(receive_msg.value().decode('utf-8')) + analyzer_uuid = receive_msg.key().decode('utf-8') + LOGGER.debug('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) + print ('Recevied Analyzer: {:} - {:}'.format(analyzer_uuid, analyzer)) - if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: - self.TerminateAnalyzerBackend(analyzer_uuid) - else: - self.StartSparkStreamer(analyzer_uuid, analyzer) - LOGGER.debug("Stop Event activated. Terminating...") - print ("Stop Event activated. Terminating...") + if analyzer["algo_name"] is None and analyzer["oper_mode"] is None: + self.TerminateAnalyzerBackend(analyzer_uuid) + else: + self.StartSparkStreamer(analyzer_uuid, analyzer) + except Exception as e: + LOGGER.warning("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) + print ("Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.ANALYTICS_REQUEST.value, e)) def TerminateAnalyzerBackend(self, analyzer_uuid): if analyzer_uuid in self.running_threads: diff --git a/src/analytics/backend/tests/messages.py b/src/analytics/backend/tests/messages.py index c3b78967efe13eef9a60e19e50e56bdfca4a410d..e5faaa1f5903d97d5201b46119eff931f420696b 100644 --- a/src/analytics/backend/tests/messages.py +++ b/src/analytics/backend/tests/messages.py @@ -42,14 +42,14 @@ def get_threshold_dict(): def create_analyzer(): _create_analyzer = Analyzer() # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) - _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING _kpi_id = KpiId() # input IDs to analyze _kpi_id.kpi_id.uuid = str(uuid.uuid4()) - _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + _kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" _create_analyzer.input_kpi_ids.append(_kpi_id) _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" diff --git a/src/analytics/backend/tests/test_backend.py b/src/analytics/backend/tests/test_backend.py index 9221bb23ee041da06a4c1f401c75d1906f6748b0..c2e7fbe3100e9904b1128ac5349566c3bb7c8f51 100644 --- a/src/analytics/backend/tests/test_backend.py +++ b/src/analytics/backend/tests/test_backend.py @@ -34,6 +34,7 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) +# --- To test Start Streamer functionality --- def test_StartSparkStreamer(): LOGGER.debug(" >>> test_StartSparkStreamer: START <<< ") analyzer_obj = create_analyzer() @@ -52,26 +53,25 @@ def test_StartSparkStreamer(): response = AnalyticsBackendServiceObj.StartSparkStreamer(analyzer_uuid, analyzer_to_generate) assert isinstance(response, bool) +# --- To TEST StartRequestListenerFunctionality # def test_StartRequestListener(): # LOGGER.info('test_RunRequestListener') # AnalyticsBackendServiceObj = AnalyticsBackendService() -# response = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) -# LOGGER.debug(str(response)) -# assert isinstance(response, tuple) +# threading.Thread(target=AnalyticsBackendServiceObj.RequestListener, args=()).start() # To test START and STOP communication together -def test_StopRequestListener(): - LOGGER.info('test_RunRequestListener') - LOGGER.info('Initiating StartRequestListener...') - AnalyticsBackendServiceObj = AnalyticsBackendService() - response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) - # LOGGER.debug(str(response_thread)) - time.sleep(10) - LOGGER.info('Initiating StopRequestListener...') - AnalyticsBackendServiceObj = AnalyticsBackendService() - response = AnalyticsBackendServiceObj.StopRequestListener(response_thread) - LOGGER.debug(str(response)) - assert isinstance(response, bool) +# def test_StopRequestListener(): +# LOGGER.info('test_RunRequestListener') +# LOGGER.info('Initiating StartRequestListener...') +# AnalyticsBackendServiceObj = AnalyticsBackendService() +# response_thread = AnalyticsBackendServiceObj.StartRequestListener() # response is Tuple (thread, stop_event) +# # LOGGER.debug(str(response_thread)) +# time.sleep(10) +# LOGGER.info('Initiating StopRequestListener...') +# AnalyticsBackendServiceObj = AnalyticsBackendService() +# response = AnalyticsBackendServiceObj.StopRequestListener(response_thread) +# LOGGER.debug(str(response)) +# assert isinstance(response, bool) # To independently tests the SparkListener functionality # def test_SparkListener(): diff --git a/src/analytics/frontend/requirements.in b/src/analytics/frontend/requirements.in index d81b9ddbeafeff94c830d48ca5594e775b9ce240..0b1ec921b8bb77c0d26e8240585a19ef165f0eec 100644 --- a/src/analytics/frontend/requirements.in +++ b/src/analytics/frontend/requirements.in @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -apscheduler==3.10.4 +apscheduler==3.10.1 confluent-kafka==2.3.* psycopg2-binary==2.9.* SQLAlchemy==1.4.* diff --git a/src/analytics/frontend/tests/messages.py b/src/analytics/frontend/tests/messages.py index 646de962e8a213582fdb7cd1446ab57bda561a96..eb25c33b03744d627efae0a436e5bdce4553b4af 100644 --- a/src/analytics/frontend/tests/messages.py +++ b/src/analytics/frontend/tests/messages.py @@ -21,13 +21,14 @@ from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, Analyze def create_analyzer_id(): _create_analyzer_id = AnalyzerId() # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) - _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + # _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + _create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" return _create_analyzer_id def create_analyzer(): _create_analyzer = Analyzer() # _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) - _create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6" + _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold" _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py index 9f5c040f366b02a6fea27e8e8696c0c118ece05a..1b4e0e14e687b454cdfdefe466dfd11e84bf245b 100644 --- a/src/analytics/frontend/tests/test_frontend.py +++ b/src/analytics/frontend/tests/test_frontend.py @@ -26,7 +26,6 @@ from common.Settings import ( get_service_port_grpc, get_env_var_name, from common.tools.kafka.Variables import KafkaTopic from common.proto.kpi_value_api_pb2 import KpiValue -from common.proto.analytics_frontend_pb2 import AnalyzerAlarms from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer, @@ -34,7 +33,7 @@ from analytics.frontend.tests.messages import ( create_analyze from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger - +from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList ########################### # Tests Setup @@ -85,46 +84,23 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic ########################### # --- "test_validate_kafka_topics" should be executed before the functionality tests --- -def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) - -# # ----- core funtionality test ----- -def test_StartAnalytics(analyticsFrontend_client): - LOGGER.info(' >>> test_StartAnalytic START: <<< ') - stream = analyticsFrontend_client.StartAnalyzer(create_analyzer()) - for response in stream: - LOGGER.debug(str(response)) - assert isinstance(response, KpiValue) - -# To test start and stop listener together def test_StartStopAnalyzers(analyticsFrontend_client): - LOGGER.info(' >>> test_StartStopAnalyzers START: <<< ') - LOGGER.info('--> StartAnalyzer') + LOGGER.info(' >>> test_StartAnalyzers START: <<< ') added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer()) LOGGER.debug(str(added_analyzer_id)) - LOGGER.info(' --> Calling StartResponseListener... ') - class_obj = AnalyticsFrontendServiceServicerImpl() - response = class_obj.StartResponseListener(added_analyzer_id.analyzer_id._uuid) - LOGGER.debug(response) - LOGGER.info("waiting for timer to comlete ...") - time.sleep(3) - LOGGER.info('--> StopAnalyzer') - response = analyticsFrontend_client.StopAnalyzer(added_analyzer_id) + assert isinstance(added_analyzer_id, AnalyzerId) + +def test_StopAnalytic(analyticsFrontend_client): + LOGGER.info(' >>> test_StopAnalytic START: <<< ') + response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id()) LOGGER.debug(str(response)) + assert isinstance(response, Empty) -# def test_SelectAnalytics(analyticsFrontend_client): -# LOGGER.info(' >>> test_SelectAnalytics START: <<< ') -# response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter()) -# LOGGER.debug(str(response)) -# assert isinstance(response, AnalyzerList) - -# def test_StopAnalytic(analyticsFrontend_client): -# LOGGER.info(' >>> test_StopAnalytic START: <<< ') -# response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id()) -# LOGGER.debug(str(response)) -# assert isinstance(response, Empty) +def test_SelectAnalytics(analyticsFrontend_client): + LOGGER.info(' >>> test_SelectAnalytics START: <<< ') + response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter()) + LOGGER.debug(str(response)) + assert isinstance(response, AnalyzerList) # def test_ResponseListener(): # LOGGER.info(' >>> test_ResponseListener START <<< ') diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index b5cb3bbe07fee03f7e13cadb0b4d87766e80cb1c..9e432d637e70236d192d5248247175ef310d8368 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -25,11 +25,12 @@ class KafkaConfig(Enum): @staticmethod def get_kafka_address() -> str: - # kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None) - # if kafka_server_address is None: - KFK_NAMESPACE = get_setting('KFK_NAMESPACE') - KFK_PORT = get_setting('KFK_SERVER_PORT') - kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) + kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None) + if kafka_server_address is None: + KFK_NAMESPACE = get_setting('KFK_NAMESPACE') + KFK_PORT = get_setting('KFK_SERVER_PORT') + kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) + # kafka_server_address = "127.0.0.1:9092" return kafka_server_address @staticmethod @@ -90,4 +91,4 @@ class KafkaTopic(Enum): return False return True -# create all topics after the deployments (Telemetry and Analytics) +# TODO: create all topics after the deployments (Telemetry and Analytics) diff --git a/src/kpi_manager/tests/test_messages.py b/src/kpi_manager/tests/test_messages.py index 7b5c45859b6c10056211f9f33df950d9668c11ea..08a2dbf7334c3e4e68c3cfa6c27ce08532521342 100644 --- a/src/kpi_manager/tests/test_messages.py +++ b/src/kpi_manager/tests/test_messages.py @@ -27,6 +27,8 @@ def create_kpi_id_request(): def create_kpi_descriptor_request(descriptor_name: str = "Test_name"): _create_kpi_request = kpi_manager_pb2.KpiDescriptor() _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + # _create_kpi_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + # _create_kpi_request.kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" _create_kpi_request.kpi_description = descriptor_name _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' diff --git a/src/kpi_value_api/requirements.in b/src/kpi_value_api/requirements.in index e95d6d8bbd81abca7eb2622e1faf6af473fcdb12..0615fa833f255bf91fd72fc484e40842face7a44 100644 --- a/src/kpi_value_api/requirements.in +++ b/src/kpi_value_api/requirements.in @@ -15,4 +15,4 @@ confluent-kafka==2.3.* requests==2.27.* prometheus-api-client==0.5.3 -apscheduler==3.10.4 +apscheduler==3.10.1 diff --git a/src/kpi_value_api/tests/test_kpi_value_api.py b/src/kpi_value_api/tests/test_kpi_value_api.py index ea6b22585a68d565c8162f1629a6a1ac4a4f6d6a..c7eb8fef1c79d0280c951d89971695d2c39ab599 100644 --- a/src/kpi_value_api/tests/test_kpi_value_api.py +++ b/src/kpi_value_api/tests/test_kpi_value_api.py @@ -78,6 +78,13 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) +# def test_GetKpiAlarms(kpi_value_api_client): +# LOGGER.debug(" >>> test_GetKpiAlarms") +# stream = kpi_value_api_client.GetKpiAlarms(create_kpi_id_request()) +# for response in stream: +# LOGGER.debug(str(response)) +# assert isinstance(response, KpiAlarms) + def test_store_kpi_values(kpi_value_api_client): LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") response = kpi_value_api_client.StoreKpiValues(create_kpi_value_list()) diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 078fa5896d5fb5033833e0e2ef2248613ef80c18..f3cf18d65eac0fda6c56a366ce8d806a4ea2562a 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -17,7 +17,8 @@ import time import random import logging import threading -from typing import Any, Dict +from typing import Any, Dict +from datetime import datetime, timezone # from common.proto.context_pb2 import Empty from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Consumer as KafkaConsumer @@ -53,6 +54,8 @@ class TelemetryBackendService(GenericGrpcService): """ listener for requests on Kafka topic. """ + LOGGER.info('Telemetry backend request listener is running ...') + print ('Telemetry backend request listener is running ...') consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.REQUEST.value]) while True: @@ -65,16 +68,19 @@ class TelemetryBackendService(GenericGrpcService): else: print("Consumer error: {}".format(receive_msg.error())) break - - collector = json.loads(receive_msg.value().decode('utf-8')) - collector_id = receive_msg.key().decode('utf-8') - LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) - print('Recevied Collector: {:} - {:}'.format(collector_id, collector)) + try: + collector = json.loads(receive_msg.value().decode('utf-8')) + collector_id = receive_msg.key().decode('utf-8') + LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) + print('Recevied Collector: {:} - {:}'.format(collector_id, collector)) - if collector['duration'] == -1 and collector['interval'] == -1: - self.TerminateCollectorBackend(collector_id) - else: - self.RunInitiateCollectorBackend(collector_id, collector) + if collector['duration'] == -1 and collector['interval'] == -1: + self.TerminateCollectorBackend(collector_id) + else: + self.RunInitiateCollectorBackend(collector_id, collector) + except Exception as e: + LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e)) + print ("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e)) def TerminateCollectorBackend(self, collector_id): if collector_id in self.running_threads: @@ -101,7 +107,7 @@ class TelemetryBackendService(GenericGrpcService): print("Initiating backend for collector: ", collector_id) start_time = time.time() while not stop_event.is_set(): - if time.time() - start_time >= collector['duration']: # condition to terminate backend + if int(collector['duration']) != -1 and time.time() - start_time >= collector['duration']: # condition to terminate backend print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time) self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. break @@ -140,7 +146,7 @@ class TelemetryBackendService(GenericGrpcService): """ producer = self.kafka_producer kpi_value : Dict = { - "time_stamp": str(time.time()), + "time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "kpi_id" : kpi_id, "kpi_value" : measured_kpi_value } diff --git a/src/telemetry/backend/tests/messagesBackend.py b/src/telemetry/backend/tests/messages.py similarity index 100% rename from src/telemetry/backend/tests/messagesBackend.py rename to src/telemetry/backend/tests/messages.py diff --git a/src/telemetry/backend/tests/test_TelemetryBackend.py b/src/telemetry/backend/tests/test_backend.py similarity index 91% rename from src/telemetry/backend/tests/test_TelemetryBackend.py rename to src/telemetry/backend/tests/test_backend.py index 665fa825e3ee31b2e92351d9c5855f627ce40fa1..4764d7f5f10aefe211bae840f06eed9c82386bf8 100644 --- a/src/telemetry/backend/tests/test_TelemetryBackend.py +++ b/src/telemetry/backend/tests/test_backend.py @@ -34,5 +34,4 @@ LOGGER = logging.getLogger(__name__) def test_RunRequestListener(): LOGGER.info('test_RunRequestListener') TelemetryBackendServiceObj = TelemetryBackendService() - response = threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start() - LOGGER.debug(str(response)) + threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start() \ No newline at end of file diff --git a/src/telemetry/frontend/tests/Messages.py b/src/telemetry/frontend/tests/Messages.py index a0e93e8a121b9efaac83f7169419911c8ee6e3ea..e6d8ef439f4ad4764c5a6f8b5f36ec68cbb10867 100644 --- a/src/telemetry/frontend/tests/Messages.py +++ b/src/telemetry/frontend/tests/Messages.py @@ -22,15 +22,18 @@ from common.proto.kpi_manager_pb2 import KpiId def create_collector_id(): _collector_id = telemetry_frontend_pb2.CollectorId() # _collector_id.collector_id.uuid = str(uuid.uuid4()) - _collector_id.collector_id.uuid = "5d45f53f-d567-429f-9427-9196ac72ff0c" + _collector_id.collector_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd" return _collector_id def create_collector_request(): _create_collector_request = telemetry_frontend_pb2.Collector() - _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) - _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) - _create_collector_request.duration_s = float(random.randint(8, 16)) - _create_collector_request.interval_s = float(random.randint(2, 4)) + # _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) + _create_collector_request.collector_id.collector_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd" + # _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_collector_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + # _create_collector_request.duration_s = float(random.randint(8, 16)) + _create_collector_request.duration_s = -1 + _create_collector_request.interval_s = float(random.randint(3, 5)) return _create_collector_request def create_collector_filter(): diff --git a/src/telemetry/frontend/tests/test_frontend.py b/src/telemetry/frontend/tests/test_frontend.py index 9c3f9d3a8f545792eb2bb3a371c6c20664d24f69..c3f8091c83f56fd4a134ec092b1e22723040595d 100644 --- a/src/telemetry/frontend/tests/test_frontend.py +++ b/src/telemetry/frontend/tests/test_frontend.py @@ -105,47 +105,10 @@ def test_SelectCollectors(telemetryFrontend_client): LOGGER.debug(str(response)) assert isinstance(response, CollectorList) -# ----- Non-gRPC method tests ----- -def test_RunResponseListener(): - LOGGER.info(' >>> test_RunResponseListener START: <<< ') - TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl() - response = TelemetryFrontendServiceObj.RunResponseListener() # becasue Method "run_kafka_listener" is not define in frontend.proto - LOGGER.debug(str(response)) - assert isinstance(response, bool) - -# ------- previous test ---------------- - -# def test_verify_db_and_table(): -# LOGGER.info(' >>> test_verify_database_and_tables START: <<< ') -# _engine = TelemetryEngine.get_engine() -# managementDB.create_database(_engine) -# managementDB.create_tables(_engine) - -# def test_StartCollector(telemetryFrontend_client): -# LOGGER.info(' >>> test_StartCollector START: <<< ') -# response = telemetryFrontend_client.StartCollector(create_collector_request()) -# LOGGER.debug(str(response)) -# assert isinstance(response, CollectorId) - -# def test_run_kafka_listener(): -# LOGGER.info(' >>> test_run_kafka_listener START: <<< ') -# name_mapping = NameMapping() -# TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl(name_mapping) -# response = TelemetryFrontendServiceObj.run_kafka_listener() # Method "run_kafka_listener" is not define in frontend.proto +# # ----- Non-gRPC method tests ----- +# def test_RunResponseListener(): +# LOGGER.info(' >>> test_RunResponseListener START: <<< ') +# TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl() +# response = TelemetryFrontendServiceObj.RunResponseListener() # becasue Method "run_kafka_listener" is not define in frontend.proto # LOGGER.debug(str(response)) # assert isinstance(response, bool) - -# def test_StopCollector(telemetryFrontend_client): -# LOGGER.info(' >>> test_StopCollector START: <<< ') -# _collector_id = telemetryFrontend_client.StartCollector(create_collector_request()) -# time.sleep(3) # wait for small amount before call the stopCollecter() -# response = telemetryFrontend_client.StopCollector(_collector_id) -# LOGGER.debug(str(response)) -# assert isinstance(response, Empty) - -# def test_select_collectors(telemetryFrontend_client): -# LOGGER.info(' >>> test_select_collector requesting <<< ') -# response = telemetryFrontend_client.SelectCollectors(create_collector_filter()) -# LOGGER.info('Received Rows after applying Filter: {:} '.format(response)) -# LOGGER.debug(str(response)) -# assert isinstance(response, CollectorList) \ No newline at end of file diff --git a/src/telemetry/requirements.in b/src/telemetry/requirements.in index a0e78d2bfb7270b9664ad5ba810e2f213d887bf7..4e9981afbcb7460344f160dcecf329f44d14e792 100644 --- a/src/telemetry/requirements.in +++ b/src/telemetry/requirements.in @@ -12,13 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -anytree==2.8.0 APScheduler==3.10.1 -influx-line-protocol==0.1.4 psycopg2-binary==2.9.3 python-dateutil==2.8.2 python-json-logger==2.0.2 pytz==2024.1 -questdb==1.0.1 -requests==2.27.1 -xmltodict==0.12.0 \ No newline at end of file +requests==2.27.1 \ No newline at end of file