diff --git a/src/telemetry/backend/service/collectors/__init__.py b/src/telemetry/backend/service/collectors/__init__.py index 084207f5107322636ef0bee4ff1b7ed3e5efc6df..151027e14f7b065e520dd3e4a42e252d76d1ebc9 100644 --- a/src/telemetry/backend/service/collectors/__init__.py +++ b/src/telemetry/backend/service/collectors/__init__.py @@ -17,10 +17,10 @@ from common.proto.context_pb2 import DeviceDriverEnum from telemetry.backend.Config import LOAD_ALL_DEVICE_DRIVERS from ..collector_api.FilterFields import FilterFieldEnum -DRIVERS = [] +COLLECTOR = [] from .emulated.EmulatedCollector import EmulatedCollector # pylint: disable=wrong-import-position -DRIVERS.append( +COLLECTOR.append( (EmulatedCollector, [ # TODO: multi-filter is not working { @@ -35,13 +35,13 @@ DRIVERS.append( ])) if LOAD_ALL_DEVICE_DRIVERS: - from .gnmi_openconfig.GnmiOpenConfigCollector import GnmiOpenConfigCollector # pylint: disable=wrong-import-position - DRIVERS.append( - (GnmiOpenConfigCollector, [ + from .gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector # pylint: disable=wrong-import-position + COLLECTOR.append( + (GNMIOpenConfigCollector, [ { - # Real Packet Router, specifying OpenConfig Driver => use OpenConfigDriver + # Real Packet Router, specifying GNMI Driver => use GnmiDriver FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.PACKET_ROUTER, - FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG, + FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_GNMI, } ])) diff --git a/src/telemetry/backend/tests/gnmi_oc/test_complete_GnmiOCcollector.py b/src/telemetry/backend/tests/gnmi_oc/test_complete_GnmiOCcollector.py new file mode 100644 index 0000000000000000000000000000000000000000..78a0c49d3832215b5dd9d639cd3cc8a262c19011 --- /dev/null +++ b/src/telemetry/backend/tests/gnmi_oc/test_complete_GnmiOCcollector.py @@ -0,0 +1,126 @@ +import logging +import time +import pytest +from telemetry.backend.service.collectors.gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector +from .messages import creat_basic_sub_request_parameters + +# Integration imports +from ..Fixtures import context_client, device_client, service_client, kpi_manager_client +from ..add_devices import load_topology + +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s %(levelname)8s [%(name)s - %(funcName)s()]: %(message)s", +) +logger = logging.getLogger(__name__) + +# ----- Add Topology ----- +def test_add_to_topology(context_client, device_client, service_client): + load_topology(context_client, device_client) + + +@pytest.fixture +def sub_parameters(): + """Fixture to provide subscription parameters.""" + return creat_basic_sub_request_parameters() + + +@pytest.fixture +def collector(sub_parameters): + """Fixture to create and connect GNMI collector.""" + collector = GNMIOpenConfigCollector( + username = sub_parameters['username'], + password = sub_parameters['password'], + insecure = sub_parameters['insecure'], + address = sub_parameters['target'][0], + port = sub_parameters['target'][1], + ) + collector.Connect() + yield collector + collector.Disconnect() + + +@pytest.fixture +def subscription_data(sub_parameters): + """Fixture to provide subscription data.""" + # It should return a list of tuples with subscription parameters. + return [ + ( + "x123", + { + "kpi" : sub_parameters['kpi'], + "endpoint" : sub_parameters['endpoint'], + "resource" : sub_parameters['resource'], + }, + float(10.0), + float(5.0), + ), + ] + + +def test_collector_connection(collector): + """Test collector connection.""" + logger.info("----- Testing GNMI OpenConfig Collector Connection -----") + assert collector.connected is True + logger.debug("Collector connected: %s", collector.connected) + + +def test_subscription_state(collector, subscription_data): + """Test state subscription.""" + logger.info("----- Testing State Subscription -----") + response = collector.SubscribeState(subscription_data) + logger.info("Subscription started: %s", subscription_data) + assert all(response) and isinstance(response, list) + + +def test_get_state_updates(collector, subscription_data): + """Test getting state updates.""" + logger.info("----- Testing State Updates -----") + collector.SubscribeState(subscription_data) + + logger.info("Requesting state updates for 5 seconds ...") + updates_received = [] + for samples in collector.GetState(duration=5.0, blocking=True): + logger.info("Received state update: %s", samples) + updates_received.append(samples) + + assert len(updates_received) > 0 + + +def test_unsubscribe_state(collector, subscription_data): + """Test unsubscribing from state.""" + logger.info("----- Testing Unsubscribe -----") + collector.SubscribeState(subscription_data) + + time.sleep(2) # Wait briefly for subscription to be active + + response = collector.UnsubscribeState("x123") + logger.info("Unsubscribed from state: %s", subscription_data) + assert response is True + +def test_full_workflow(collector, subscription_data): + """Test complete workflow: subscribe, get updates, unsubscribe.""" + logger.info("----- Testing Full Workflow -----") + + # Subscribe + response1 = collector.SubscribeState(subscription_data) + logger.info("Subscription started: %s", subscription_data) + assert all(response1) and isinstance(response1, list) + + # Get updates + logger.info("Requesting state updates for 5 seconds ...") + updates_received = [] + for samples in collector.GetState(duration=5.0, blocking=True): + logger.info("Received state update: %s", samples) + updates_received.append(samples) + assert len(updates_received) > 0 + # Wait for additional updates + logger.info("Waiting for updates for 5 seconds...") + time.sleep(5) + + # Unsubscribe + response2 = collector.UnsubscribeState("x123") + logger.info("Unsubscribed from state: %s", subscription_data) + assert response2 is True + + logger.info("----- Workflow test completed -----")