Commit 37cae078 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Enhance test documentation: clarify preconditions and add context for test...

Enhance test documentation: clarify preconditions and add context for test cases in gNMI OpenConfig tests
parent 02c85ccc
Loading
Loading
Loading
Loading
+28 −99
Original line number Diff line number Diff line
@@ -33,9 +33,10 @@ LOGGER = logging.getLogger(__name__)
# def test_add_to_topology(context_client, device_client):
#     load_topology(context_client, device_client)

# # ----- Automatically GET and REMOVE Topology and Context -----
# # This will only work if there is not devices and links present in the topology. (Otherwise it will raise forigen key error)
# # First remove all devices and links from the topology through GUI.
# ----- Automatically GET and REMOVE Topology and Context -----
# - This method will remove the topology and context from the DB and only work if there is no devices and links present in the topology.
    # + Otherwise it will raise forigen key error.
    # + First remove all devices and links from the topology through GUI.
# def test_get_and_remove_topology_context(context_client):
#     response = get_topology(context_client = context_client, topology_uuid = "admin", context_uuid = "admin")
#     LOGGER.info(f"Topology: {response}")
@@ -48,14 +49,14 @@ LOGGER = logging.getLogger(__name__)
#     # Remove Topology
#     topology_id.context_id.CopyFrom(context_id)
#     response    = context_client.RemoveTopology(topology_id)
#     # assert response is ""
#     LOGGER.info(f"Topology removed Sucessfully")
#     # Remove Context
#     response    = context_client.RemoveContext(context_id)
#     # assert response is ""
#     LOGGER.info(f"Context removed Sucessfully")

# ----- Set KPI Descriptor -----
# - This method will set a KPI Descriptor in the KPI DB.
    # + In the messages file, add the correct device_uuid that already exist in the topology. 
# def test_SetKpiDescriptor(kpi_manager_client):
#     LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ")
#     response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
@@ -143,99 +144,27 @@ def telemetry_backend_service():
# -------------------- ACTUAL TEST CASES -----------------------
# --------------------------------------------------------------

# def test_helper_get_collector_by_kpi_id(kpi_manager_client, context_client):
#     LOGGER.info("Testing get_collector_by_kpi_id...")
# This test validates the correct selection of a collector based on a KPI ID. (Not for Gitlab CI pipeline)
# Preconditions:
    # - A device must be added in Context DB with correct device_id.
        # + Uncomment test_add_to_topology() in helper methods section to add a device.
    # - A KPI Descriptor must be added in KPI DB with correct device_id.
        # + Uncomment test_SetKpiDescriptor() in helper methods section to add a KPI Descriptor.
    # - Kafka should be exposed externally 'kubectl port-forward -n kafka service/kafka-public 9094:9094'.
 
#     driver_factory        = DriverFactory(COLLECTORS)
#     driver_instance_cache = DriverInstanceCache(driver_factory)
def test_helper_get_collector_by_kpi_id(kpi_manager_client, context_client):
    LOGGER.info("Testing get_collector_by_kpi_id...")

#     kpi_id = "6e22f180-ba28-4641-b190-2287bf447777"
#     collector = get_collector_by_kpi_id(
#         kpi_id,
#         kpi_manager_client,
#         context_client,
#         driver_instance_cache
#     )
#     assert collector is not None
#     assert isinstance(collector, GNMIOpenConfigCollector)
#     LOGGER.info(f"Collector for KPI ID {kpi_id} found: {collector.__class__.__name__}")

# def test_InitiateCollectorBackend(telemetry_backend_service):
#     LOGGER.info(" Backend Initiated Successfully. Waiting for timer to finish ...")
#     time.sleep(10)
#     LOGGER.info(" Telemetry Backend Sleep Timer Finished.")


# --------------------------------------------------------------
# -------------------- FIRST ITERATION TEST --------------------
# --------------------------------------------------------------


# def test_collector_connection(collector):
#     """Test collector connection."""
#     LOGGER.info("----- Testing GNMI OpenConfig Collector Connection -----")
#     assert collector.connected is True
#     LOGGER.debug("Collector connected: %s", collector.connected)


# def test_subscription_state(collector, subscription_data):
#     """Test state subscription."""
#     LOGGER.info("----- Testing State Subscription -----")
#     response = collector.SubscribeState(subscription_data)
#     LOGGER.info("Subscription started: %s", subscription_data)
#     assert all(response) and isinstance(response, list)


# def test_get_state_updates(collector, subscription_data):
#     """Test getting state updates."""
#     LOGGER.info("----- Testing State Updates -----")
#     collector.SubscribeState(subscription_data)
    
#     LOGGER.info("Requesting state updates for 5 seconds ...")
#     updates_received = []
#     for samples in collector.GetState(duration=5.0, blocking=True):
#         LOGGER.info("Received state update: %s", samples)
#         updates_received.append(samples)
    
#     assert len(updates_received) > 0


# def test_unsubscribe_state(collector, subscription_data):
#     """Test unsubscribing from state."""
#     LOGGER.info("----- Testing Unsubscribe -----")
#     collector.SubscribeState(subscription_data)
    
#     LOGGER.info("Waiting for subscription to be active...")
#     time.sleep(5)  # Wait briefly for subscription to be active
    
#     sub_id = subscription_data[0][0]  # Extract subscription ID
#     response = collector.UnsubscribeState(sub_id)
#     LOGGER.info("Unsubscribed from state: %s", subscription_data)
#     assert response is True

# def test_full_workflow(collector, subscription_data):
#     """Test complete workflow: subscribe, get updates, unsubscribe."""
#     LOGGER.info("----- Testing Full Workflow -----")
    
#     # Subscribe
#     response1 = collector.SubscribeState(subscription_data)
#     LOGGER.info("Subscription started: %s", subscription_data)
#     assert all(response1) and isinstance(response1, list)
    
#     # Get updates
#     LOGGER.info("Requesting state updates for 5 seconds ...")
#     updates_received = []
#     for samples in collector.GetState(duration=5.0, blocking=True):
#         LOGGER.info("Received state update: %s", samples)
#         updates_received.append(samples)
#     assert len(updates_received) > 0
#     # Wait for additional updates
#     LOGGER.info("Waiting for updates for 5 seconds...")
#     time.sleep(5)
    
#     # Unsubscribe
#     response2 = collector.UnsubscribeState("x123")
#     LOGGER.info("Unsubscribed from state: %s", subscription_data)
#     assert response2 is True
    driver_factory        = DriverFactory(COLLECTORS)
    driver_instance_cache = DriverInstanceCache(driver_factory)

#     LOGGER.info("----- Workflow test completed -----")
    kpi_id = "6e22f180-ba28-4641-b190-2287bf447777"
    collector = get_collector_by_kpi_id(
        kpi_id,
        kpi_manager_client,
        context_client,
        driver_instance_cache
    )
    assert collector is not None
    assert isinstance(collector, GNMIOpenConfigCollector)
    LOGGER.info(f"Collector for KPI ID {kpi_id} found: {collector.__class__.__name__}")
+5 −0
Original line number Diff line number Diff line
@@ -49,6 +49,11 @@ def subscription_data(sub_parameters):
        ),
    ]

# --------------------------------------------------------------
# -------------------- ACTUAL TEST CASES -----------------------
# --------------------------------------------------------------
# Necessary to have a containerLab running with gNMI OpenConfig devices.
    # - See messages.py for more details on the parameters. 

def test_collector_connection(collector):
    """Test collector connection."""