Loading src/tests/ofc26_flexscale/test_ofc26_mgon_integration_V2.py +14 −24 Original line number Diff line number Diff line Loading @@ -52,7 +52,6 @@ def test_Complete_MGON_Integration(kpi_manager_client, telemetry_frontend_client # 1. KPI Descriptor Creation LOGGER.info(" >>> test_Complete_MGON_Integration: START <<< ") kpi_descriptor_obj = create_kpi_descriptor_request() _search_kpi_id = kpi_manager_pb2.KpiId() _search_kpi_id = kpi_descriptor_obj.kpi_id try: Loading @@ -60,7 +59,7 @@ def test_Complete_MGON_Integration(kpi_manager_client, telemetry_frontend_client if isinstance(response, kpi_manager_pb2.KpiDescriptor): LOGGER.info("KPI Descriptor already exists with ID: %s. Skipping creation.", _search_kpi_id.kpi_id.uuid) except Exception as e: LOGGER.info("No existing KPI Descriptor found with ID: %s. Proceeding to create it.", _search_kpi_id.kpi_id.uuid) LOGGER.info("No existing KPI Descriptor found with ID: %s. Proceeding to create it. Error: %s", _search_kpi_id.kpi_id.uuid, str(e)) response = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_obj) LOGGER.info("Response gRPC message object: {:}".format(response)) assert isinstance(response, KpiId) Loading Loading @@ -98,14 +97,15 @@ def test_Complete_MGON_Integration(kpi_manager_client, telemetry_frontend_client args=( _coll_id, _collector, None, _duration, _interval, None, None, None, None, stop_event ), daemon=False ) ) collector_thread.start() def stop_after_duration(completion_time, stop_event): time.sleep(completion_time) if not stop_event.is_set(): LOGGER.warning(f"Execution duration ({completion_time}) completed for Collector: {_coll_id}") if stop_event: stop_event.set() duration_thread = threading.Thread( Loading @@ -113,28 +113,18 @@ def test_Complete_MGON_Integration(kpi_manager_client, telemetry_frontend_client args=(_duration, stop_event) ) duration_thread.start() # LOGGER.info("----- Testing State Subscription -----") # sub_data = [( # "x123", # { # "kpi" : sub_parameters['kpi'], # "endpoint" : sub_parameters['endpoint'], # "resource" : sub_parameters['resource'], # }, # sub_parameters['duration'], # sub_parameters['sample_interval'], # ),] LOGGER.info("Sleeping for %d seconds...", _duration) time.sleep(_duration) # LOGGER.info("Subscription started: Status: %s, Data: %s", response, sub_data) LOGGER.info("Setting stop event for Collector: %s", _coll_id) stop_event.set() # Wait for collector thread to complete collector_thread.join(timeout=10) if collector_thread.is_alive(): LOGGER.warning("Collector thread did not terminate within timeout") LOGGER.info("Sleeping...") time.sleep(600) if stop_event: stop_event.set() LOGGER.info("Done sleeping.") LOGGER.info(" >>> test_Complete_MGON_Integration: END <<< ") Loading Loading
src/tests/ofc26_flexscale/test_ofc26_mgon_integration_V2.py +14 −24 Original line number Diff line number Diff line Loading @@ -52,7 +52,6 @@ def test_Complete_MGON_Integration(kpi_manager_client, telemetry_frontend_client # 1. KPI Descriptor Creation LOGGER.info(" >>> test_Complete_MGON_Integration: START <<< ") kpi_descriptor_obj = create_kpi_descriptor_request() _search_kpi_id = kpi_manager_pb2.KpiId() _search_kpi_id = kpi_descriptor_obj.kpi_id try: Loading @@ -60,7 +59,7 @@ def test_Complete_MGON_Integration(kpi_manager_client, telemetry_frontend_client if isinstance(response, kpi_manager_pb2.KpiDescriptor): LOGGER.info("KPI Descriptor already exists with ID: %s. Skipping creation.", _search_kpi_id.kpi_id.uuid) except Exception as e: LOGGER.info("No existing KPI Descriptor found with ID: %s. Proceeding to create it.", _search_kpi_id.kpi_id.uuid) LOGGER.info("No existing KPI Descriptor found with ID: %s. Proceeding to create it. Error: %s", _search_kpi_id.kpi_id.uuid, str(e)) response = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_obj) LOGGER.info("Response gRPC message object: {:}".format(response)) assert isinstance(response, KpiId) Loading Loading @@ -98,14 +97,15 @@ def test_Complete_MGON_Integration(kpi_manager_client, telemetry_frontend_client args=( _coll_id, _collector, None, _duration, _interval, None, None, None, None, stop_event ), daemon=False ) ) collector_thread.start() def stop_after_duration(completion_time, stop_event): time.sleep(completion_time) if not stop_event.is_set(): LOGGER.warning(f"Execution duration ({completion_time}) completed for Collector: {_coll_id}") if stop_event: stop_event.set() duration_thread = threading.Thread( Loading @@ -113,28 +113,18 @@ def test_Complete_MGON_Integration(kpi_manager_client, telemetry_frontend_client args=(_duration, stop_event) ) duration_thread.start() # LOGGER.info("----- Testing State Subscription -----") # sub_data = [( # "x123", # { # "kpi" : sub_parameters['kpi'], # "endpoint" : sub_parameters['endpoint'], # "resource" : sub_parameters['resource'], # }, # sub_parameters['duration'], # sub_parameters['sample_interval'], # ),] LOGGER.info("Sleeping for %d seconds...", _duration) time.sleep(_duration) # LOGGER.info("Subscription started: Status: %s, Data: %s", response, sub_data) LOGGER.info("Setting stop event for Collector: %s", _coll_id) stop_event.set() # Wait for collector thread to complete collector_thread.join(timeout=10) if collector_thread.is_alive(): LOGGER.warning("Collector thread did not terminate within timeout") LOGGER.info("Sleeping...") time.sleep(600) if stop_event: stop_event.set() LOGGER.info("Done sleeping.") LOGGER.info(" >>> test_Complete_MGON_Integration: END <<< ") Loading