Commit 02c85ccc authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Refactor GNMI OpenConfig tests: update requirements, add new test files, and...

Refactor GNMI OpenConfig tests: update requirements, add new test files, and enhance existing test cases
parent 7353c445
Loading
Loading
Loading
Loading
+0 −3
Original line number Diff line number Diff line
@@ -17,9 +17,6 @@ anytree==2.8.0
APScheduler>=3.10.4
APScheduler>=3.10.4
confluent-kafka==2.3.*
# pyang==2.6.*
# git+https://github.com/robshakir/pyangbind.git
# libyang==2.8.4
kafka-python==2.0.6
numpy==2.0.1
pytz>=2025.2
+1 −1
Original line number Diff line number Diff line
@@ -40,7 +40,7 @@ def load_topology(
    results = descriptor_loader.process()
    # LOGGER.info('Descriptor Load Results: {:s}'.format(str(results)))
    check_descriptor_load_results(results, descriptor_loader)
    # descriptor_loader.validate()
    descriptor_loader.validate()

    # Verify the scenario has no services/slices
    response = context_client.GetContext(ADMIN_CONTEXT_ID)
+18 −0
Original line number Diff line number Diff line
import uuid
from common.proto import kpi_manager_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from src.telemetry.backend.service.collectors.gnmi_oc.KPI import KPI

# Test device connection parameters
@@ -45,3 +48,18 @@ def creat_basic_sub_request_parameters(
        'resource'          : resource,
        'endpoint'          : endpoint,
    }

def create_kpi_descriptor_request(descriptor_name: str = "Test_name"):
    _create_kpi_request                                    = kpi_manager_pb2.KpiDescriptor()
    # _create_kpi_request.kpi_id.kpi_id.uuid                 = str(uuid.uuid4())
    _create_kpi_request.kpi_id.kpi_id.uuid                 = "6e22f180-ba28-4641-b190-2287bf447777"
    _create_kpi_request.kpi_description                    = descriptor_name
    _create_kpi_request.kpi_sample_type                    = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
    # _create_kpi_request.device_id.device_uuid.uuid         = str(uuid.uuid4())
    _create_kpi_request.device_id.device_uuid.uuid         = "a8695f53-ba2e-57bd-b586-edf2b5e054b1"
    _create_kpi_request.service_id.service_uuid.uuid       = 'SERV2'
    _create_kpi_request.slice_id.slice_uuid.uuid           = 'SLC1'
    _create_kpi_request.endpoint_id.endpoint_uuid.uuid     = str(uuid.uuid4())
    _create_kpi_request.connection_id.connection_uuid.uuid = 'CON1' 
    _create_kpi_request.link_id.link_uuid.uuid             = 'LNK1' 
    return _create_kpi_request
 No newline at end of file
+0 −126
Original line number Diff line number Diff line
import logging
import time
import pytest
from telemetry.backend.service.collectors.gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector
from .messages import creat_basic_sub_request_parameters

# Integration imports
from ..Fixtures import context_client, device_client, service_client, kpi_manager_client
from ..add_devices import load_topology

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s %(levelname)8s [%(name)s - %(funcName)s()]: %(message)s",
)
logger = logging.getLogger(__name__)

# ----- Add Topology -----
def test_add_to_topology(context_client, device_client, service_client):
    load_topology(context_client, device_client)


@pytest.fixture
def sub_parameters():
    """Fixture to provide subscription parameters."""
    return creat_basic_sub_request_parameters()


@pytest.fixture
def collector(sub_parameters):
    """Fixture to create and connect GNMI collector."""
    collector = GNMIOpenConfigCollector(
        username = sub_parameters['username'],
        password = sub_parameters['password'],
        insecure = sub_parameters['insecure'],
        address  = sub_parameters['target'][0],
        port     = sub_parameters['target'][1],
    )
    collector.Connect()
    yield collector
    collector.Disconnect()


@pytest.fixture
def subscription_data(sub_parameters):
    """Fixture to provide subscription data."""
    # It should return a list of tuples with subscription parameters.
    return [
        (
            "x123",
            {
                "kpi"      : sub_parameters['kpi'],
                "endpoint" : sub_parameters['endpoint'],
                "resource" : sub_parameters['resource'],
            },
            float(10.0),
            float(5.0),
        ),
    ]


def test_collector_connection(collector):
    """Test collector connection."""
    logger.info("----- Testing GNMI OpenConfig Collector Connection -----")
    assert collector.connected is True
    logger.debug("Collector connected: %s", collector.connected)


def test_subscription_state(collector, subscription_data):
    """Test state subscription."""
    logger.info("----- Testing State Subscription -----")
    response = collector.SubscribeState(subscription_data)
    logger.info("Subscription started: %s", subscription_data)
    assert all(response) and isinstance(response, list)


def test_get_state_updates(collector, subscription_data):
    """Test getting state updates."""
    logger.info("----- Testing State Updates -----")
    collector.SubscribeState(subscription_data)
    
    logger.info("Requesting state updates for 5 seconds ...")
    updates_received = []
    for samples in collector.GetState(duration=5.0, blocking=True):
        logger.info("Received state update: %s", samples)
        updates_received.append(samples)
    
    assert len(updates_received) > 0


def test_unsubscribe_state(collector, subscription_data):
    """Test unsubscribing from state."""
    logger.info("----- Testing Unsubscribe -----")
    collector.SubscribeState(subscription_data)
    
    time.sleep(2)  # Wait briefly for subscription to be active
    
    response = collector.UnsubscribeState("x123")
    logger.info("Unsubscribed from state: %s", subscription_data)
    assert response is True

def test_full_workflow(collector, subscription_data):
    """Test complete workflow: subscribe, get updates, unsubscribe."""
    logger.info("----- Testing Full Workflow -----")
    
    # Subscribe
    response1 = collector.SubscribeState(subscription_data)
    logger.info("Subscription started: %s", subscription_data)
    assert all(response1) and isinstance(response1, list)
    
    # Get updates
    logger.info("Requesting state updates for 5 seconds ...")
    updates_received = []
    for samples in collector.GetState(duration=5.0, blocking=True):
        logger.info("Received state update: %s", samples)
        updates_received.append(samples)
    assert len(updates_received) > 0
    # Wait for additional updates
    logger.info("Waiting for updates for 5 seconds...")
    time.sleep(5)
    
    # Unsubscribe
    response2 = collector.UnsubscribeState("x123")
    logger.info("Unsubscribed from state: %s", subscription_data)
    assert response2 is True
    
    logger.info("----- Workflow test completed -----")
+241 −0
Original line number Diff line number Diff line
import logging
import pytest
import time

from ..add_devices import load_topology
from ..Fixtures import context_client, device_client, service_client, kpi_manager_client
from .messages import creat_basic_sub_request_parameters
from .messages import create_kpi_descriptor_request
from common.proto.context_pb2 import TopologyId, ContextId, Empty
from common.proto.kpi_manager_pb2 import KpiId
from common.tools.context_queries.Topology import get_topology
from common.tools.kafka.Variables import KafkaTopic

from telemetry.backend.service.HelperMethods import get_collector_by_kpi_id
from telemetry.backend.service.collector_api.DriverFactory import DriverFactory
from telemetry.backend.service.collector_api.DriverInstanceCache import DriverInstanceCache, preload_drivers
from telemetry.backend.service.collectors import COLLECTORS
from telemetry.backend.service.collectors.gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector
from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService


logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s %(levelname)8s [%(name)s - %(funcName)s()]: %(message)s",
)
LOGGER = logging.getLogger(__name__)

# --------------------------------------------------------------
# -------------------- EXTRA HELPER METHODS --------------------
# --------------------------------------------------------------

# ----- Add Topology -----
# def test_add_to_topology(context_client, device_client):
#     load_topology(context_client, device_client)

# # ----- Automatically GET and REMOVE Topology and Context -----
# # This will only work if there is not devices and links present in the topology. (Otherwise it will raise forigen key error)
# # First remove all devices and links from the topology through GUI.
# def test_get_and_remove_topology_context(context_client):
#     response = get_topology(context_client = context_client, topology_uuid = "admin", context_uuid = "admin")
#     LOGGER.info(f"Topology: {response}")
#     assert response is not None
#     # create context_id and topology_id from response
#     context_id  = ContextId()
#     context_id  = response.topology_id.context_id
#     topology_id = TopologyId()
#     topology_id = response.topology_id
#     # Remove Topology
#     topology_id.context_id.CopyFrom(context_id)
#     response    = context_client.RemoveTopology(topology_id)
#     # assert response is ""
#     LOGGER.info(f"Topology removed Sucessfully")
#     # Remove Context
#     response    = context_client.RemoveContext(context_id)
#     # assert response is ""
#     LOGGER.info(f"Context removed Sucessfully")

# ----- Set KPI Descriptor -----
# def test_SetKpiDescriptor(kpi_manager_client):
#     LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ")
#     response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
#     LOGGER.info("Response gRPC message object: {:}".format(response))
#     assert isinstance(response, KpiId)

# --------------------------------------------------------------
# -------------------- REQUIRE FIXTURES ------------------------
# --------------------------------------------------------------

@pytest.fixture(autouse=True)
def log_all_methods(request):
    '''
    This fixture logs messages before and after each test function runs, indicating the start and end of the test.
    The autouse=True parameter ensures that this logging happens automatically for all tests in the module.
    '''
    LOGGER.info(f" >>>>> Starting test: {request.node.name} ")
    yield
    LOGGER.info(f" <<<<< Finished test: {request.node.name} ")


@pytest.fixture
def sub_parameters():
    """Fixture to provide subscription parameters."""
    return creat_basic_sub_request_parameters()


@pytest.fixture
def collector(sub_parameters):
    """Fixture to create and connect GNMI collector."""
    collector = GNMIOpenConfigCollector(
        username = sub_parameters['username'],
        password = sub_parameters['password'],
        insecure = sub_parameters['insecure'],
        address  = sub_parameters['target'][0],
        port     = sub_parameters['target'][1],
    )
    collector.Connect()
    yield collector
    collector.Disconnect()


@pytest.fixture
def subscription_data(sub_parameters):
    """Fixture to provide subscription data."""
    # It should return a list of tuples with subscription parameters.
    return [
        (
            "sub_id_123",
            {
                "kpi"      : sub_parameters['kpi'],
                "endpoint" : sub_parameters['endpoint'],
                "resource" : sub_parameters['resource'],
            },
            float(10.0),
            float(5.0),
        ),
    ]

@pytest.fixture
def telemetry_backend_service():
    LOGGER.info('Initializing TelemetryBackendService...')

    KafkaTopic.create_all_topics()

    # Initialize Driver framework
    driver_factory        = DriverFactory(COLLECTORS)
    driver_instance_cache = DriverInstanceCache(driver_factory)

    _service              = TelemetryBackendService(driver_instance_cache)
    _service.start()

    LOGGER.info('Preloading collectors...')
    preload_drivers(driver_instance_cache)

    LOGGER.info('Yielding TelemetryBackendService...')
    yield _service

    LOGGER.info('Terminating TelemetryBackendService...')
    _service.stop()
    LOGGER.info('Terminated TelemetryBackendService...')


# --------------------------------------------------------------
# -------------------- ACTUAL TEST CASES -----------------------
# --------------------------------------------------------------

# def test_helper_get_collector_by_kpi_id(kpi_manager_client, context_client):
#     LOGGER.info("Testing get_collector_by_kpi_id...")

#     driver_factory        = DriverFactory(COLLECTORS)
#     driver_instance_cache = DriverInstanceCache(driver_factory)

#     kpi_id = "6e22f180-ba28-4641-b190-2287bf447777"
#     collector = get_collector_by_kpi_id(
#         kpi_id,
#         kpi_manager_client,
#         context_client,
#         driver_instance_cache
#     )
#     assert collector is not None
#     assert isinstance(collector, GNMIOpenConfigCollector)
#     LOGGER.info(f"Collector for KPI ID {kpi_id} found: {collector.__class__.__name__}")

# def test_InitiateCollectorBackend(telemetry_backend_service):
#     LOGGER.info(" Backend Initiated Successfully. Waiting for timer to finish ...")
#     time.sleep(10)
#     LOGGER.info(" Telemetry Backend Sleep Timer Finished.")


# --------------------------------------------------------------
# -------------------- FIRST ITERATION TEST --------------------
# --------------------------------------------------------------


# def test_collector_connection(collector):
#     """Test collector connection."""
#     LOGGER.info("----- Testing GNMI OpenConfig Collector Connection -----")
#     assert collector.connected is True
#     LOGGER.debug("Collector connected: %s", collector.connected)


# def test_subscription_state(collector, subscription_data):
#     """Test state subscription."""
#     LOGGER.info("----- Testing State Subscription -----")
#     response = collector.SubscribeState(subscription_data)
#     LOGGER.info("Subscription started: %s", subscription_data)
#     assert all(response) and isinstance(response, list)


# def test_get_state_updates(collector, subscription_data):
#     """Test getting state updates."""
#     LOGGER.info("----- Testing State Updates -----")
#     collector.SubscribeState(subscription_data)
    
#     LOGGER.info("Requesting state updates for 5 seconds ...")
#     updates_received = []
#     for samples in collector.GetState(duration=5.0, blocking=True):
#         LOGGER.info("Received state update: %s", samples)
#         updates_received.append(samples)
    
#     assert len(updates_received) > 0


# def test_unsubscribe_state(collector, subscription_data):
#     """Test unsubscribing from state."""
#     LOGGER.info("----- Testing Unsubscribe -----")
#     collector.SubscribeState(subscription_data)
    
#     LOGGER.info("Waiting for subscription to be active...")
#     time.sleep(5)  # Wait briefly for subscription to be active
    
#     sub_id = subscription_data[0][0]  # Extract subscription ID
#     response = collector.UnsubscribeState(sub_id)
#     LOGGER.info("Unsubscribed from state: %s", subscription_data)
#     assert response is True

# def test_full_workflow(collector, subscription_data):
#     """Test complete workflow: subscribe, get updates, unsubscribe."""
#     LOGGER.info("----- Testing Full Workflow -----")
    
#     # Subscribe
#     response1 = collector.SubscribeState(subscription_data)
#     LOGGER.info("Subscription started: %s", subscription_data)
#     assert all(response1) and isinstance(response1, list)
    
#     # Get updates
#     LOGGER.info("Requesting state updates for 5 seconds ...")
#     updates_received = []
#     for samples in collector.GetState(duration=5.0, blocking=True):
#         LOGGER.info("Received state update: %s", samples)
#         updates_received.append(samples)
#     assert len(updates_received) > 0
#     # Wait for additional updates
#     LOGGER.info("Waiting for updates for 5 seconds...")
#     time.sleep(5)
    
#     # Unsubscribe
#     response2 = collector.UnsubscribeState("x123")
#     LOGGER.info("Unsubscribed from state: %s", subscription_data)
#     assert response2 is True
    
#     LOGGER.info("----- Workflow test completed -----")
Loading