Skip to content
Snippets Groups Projects
Commit d0d2caf8 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Refactor collector initialization and add tests for gNMI OpenConfig Collector functionality

parent faef6e3c
No related branches found
No related tags found
1 merge request!289Draft: Resolve "(CTTC) Implement Telemetry Backend Collector gNMI/OpenConfig"
...@@ -17,10 +17,10 @@ from common.proto.context_pb2 import DeviceDriverEnum ...@@ -17,10 +17,10 @@ from common.proto.context_pb2 import DeviceDriverEnum
from telemetry.backend.Config import LOAD_ALL_DEVICE_DRIVERS from telemetry.backend.Config import LOAD_ALL_DEVICE_DRIVERS
from ..collector_api.FilterFields import FilterFieldEnum from ..collector_api.FilterFields import FilterFieldEnum
DRIVERS = [] COLLECTOR = []
from .emulated.EmulatedCollector import EmulatedCollector # pylint: disable=wrong-import-position from .emulated.EmulatedCollector import EmulatedCollector # pylint: disable=wrong-import-position
DRIVERS.append( COLLECTOR.append(
(EmulatedCollector, [ (EmulatedCollector, [
# TODO: multi-filter is not working # TODO: multi-filter is not working
{ {
...@@ -35,13 +35,13 @@ DRIVERS.append( ...@@ -35,13 +35,13 @@ DRIVERS.append(
])) ]))
if LOAD_ALL_DEVICE_DRIVERS: if LOAD_ALL_DEVICE_DRIVERS:
from .gnmi_openconfig.GnmiOpenConfigCollector import GnmiOpenConfigCollector # pylint: disable=wrong-import-position from .gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector # pylint: disable=wrong-import-position
DRIVERS.append( COLLECTOR.append(
(GnmiOpenConfigCollector, [ (GNMIOpenConfigCollector, [
{ {
# Real Packet Router, specifying OpenConfig Driver => use OpenConfigDriver # Real Packet Router, specifying GNMI Driver => use GnmiDriver
FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.PACKET_ROUTER, FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.PACKET_ROUTER,
FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG, FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_GNMI,
} }
])) ]))
import logging
import time
import pytest
from telemetry.backend.service.collectors.gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector
from .messages import creat_basic_sub_request_parameters
# Integration imports
from ..Fixtures import context_client, device_client, service_client, kpi_manager_client
from ..add_devices import load_topology
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(levelname)8s [%(name)s - %(funcName)s()]: %(message)s",
)
logger = logging.getLogger(__name__)
# ----- Add Topology -----
def test_add_to_topology(context_client, device_client, service_client):
load_topology(context_client, device_client)
@pytest.fixture
def sub_parameters():
"""Fixture to provide subscription parameters."""
return creat_basic_sub_request_parameters()
@pytest.fixture
def collector(sub_parameters):
"""Fixture to create and connect GNMI collector."""
collector = GNMIOpenConfigCollector(
username = sub_parameters['username'],
password = sub_parameters['password'],
insecure = sub_parameters['insecure'],
address = sub_parameters['target'][0],
port = sub_parameters['target'][1],
)
collector.Connect()
yield collector
collector.Disconnect()
@pytest.fixture
def subscription_data(sub_parameters):
"""Fixture to provide subscription data."""
# It should return a list of tuples with subscription parameters.
return [
(
"x123",
{
"kpi" : sub_parameters['kpi'],
"endpoint" : sub_parameters['endpoint'],
"resource" : sub_parameters['resource'],
},
float(10.0),
float(5.0),
),
]
def test_collector_connection(collector):
"""Test collector connection."""
logger.info("----- Testing GNMI OpenConfig Collector Connection -----")
assert collector.connected is True
logger.debug("Collector connected: %s", collector.connected)
def test_subscription_state(collector, subscription_data):
"""Test state subscription."""
logger.info("----- Testing State Subscription -----")
response = collector.SubscribeState(subscription_data)
logger.info("Subscription started: %s", subscription_data)
assert all(response) and isinstance(response, list)
def test_get_state_updates(collector, subscription_data):
"""Test getting state updates."""
logger.info("----- Testing State Updates -----")
collector.SubscribeState(subscription_data)
logger.info("Requesting state updates for 5 seconds ...")
updates_received = []
for samples in collector.GetState(duration=5.0, blocking=True):
logger.info("Received state update: %s", samples)
updates_received.append(samples)
assert len(updates_received) > 0
def test_unsubscribe_state(collector, subscription_data):
"""Test unsubscribing from state."""
logger.info("----- Testing Unsubscribe -----")
collector.SubscribeState(subscription_data)
time.sleep(2) # Wait briefly for subscription to be active
response = collector.UnsubscribeState("x123")
logger.info("Unsubscribed from state: %s", subscription_data)
assert response is True
def test_full_workflow(collector, subscription_data):
"""Test complete workflow: subscribe, get updates, unsubscribe."""
logger.info("----- Testing Full Workflow -----")
# Subscribe
response1 = collector.SubscribeState(subscription_data)
logger.info("Subscription started: %s", subscription_data)
assert all(response1) and isinstance(response1, list)
# Get updates
logger.info("Requesting state updates for 5 seconds ...")
updates_received = []
for samples in collector.GetState(duration=5.0, blocking=True):
logger.info("Received state update: %s", samples)
updates_received.append(samples)
assert len(updates_received) > 0
# Wait for additional updates
logger.info("Waiting for updates for 5 seconds...")
time.sleep(5)
# Unsubscribe
response2 = collector.UnsubscribeState("x123")
logger.info("Unsubscribed from state: %s", subscription_data)
assert response2 is True
logger.info("----- Workflow test completed -----")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment