From faef6e3c5aa20152e4955fdd3f2168b08c91efe9 Mon Sep 17 00:00:00 2001 From: Waleed Akbar <wakbar@cttc.es> Date: Wed, 18 Jun 2025 11:35:22 +0200 Subject: [PATCH] Implement gNMI OpenConfig Collector with subscription management and telemetry updates - Added main collector file - Added helper files - Added pytest file --- common_requirements.in | 9 +- scripts/run_tests_locally-telemetry-gnmi.sh | 5 +- .../gnmi_oc/GnmiOpenConfigCollector.py | 164 ++++++++++++++++++ .../backend/service/collectors/gnmi_oc/KPI.py | 32 ++++ .../service/collectors/gnmi_oc/PathMapper.py | 101 +++++++++++ .../collectors/gnmi_oc/SubscriptionNew.py | 144 +++++++++++++++ .../service/collectors/gnmi_oc/__init__.py | 13 ++ .../backend/tests/gnmi_oc/__init__.py | 13 ++ .../backend/tests/gnmi_oc/messages.py | 47 +++++ .../gnmi_oc/test_GnmiOpenConfigCollector.py | 118 +++++++++++++ 10 files changed, 641 insertions(+), 5 deletions(-) create mode 100644 src/telemetry/backend/service/collectors/gnmi_oc/GnmiOpenConfigCollector.py create mode 100644 src/telemetry/backend/service/collectors/gnmi_oc/KPI.py create mode 100644 src/telemetry/backend/service/collectors/gnmi_oc/PathMapper.py create mode 100644 src/telemetry/backend/service/collectors/gnmi_oc/SubscriptionNew.py create mode 100644 src/telemetry/backend/service/collectors/gnmi_oc/__init__.py create mode 100644 src/telemetry/backend/tests/gnmi_oc/__init__.py create mode 100644 src/telemetry/backend/tests/gnmi_oc/messages.py create mode 100644 src/telemetry/backend/tests/gnmi_oc/test_GnmiOpenConfigCollector.py diff --git a/common_requirements.in b/common_requirements.in index 12c6c778d..ee19ba961 100644 --- a/common_requirements.in +++ b/common_requirements.in @@ -13,14 +13,17 @@ # limitations under the License. coverage==6.3 -grpcio==1.47.* +# grpcio==1.47.* +grpcio==1.60.0 grpcio-health-checking==1.47.* grpcio-reflection==1.47.* -grpcio-tools==1.47.* +# grpcio-tools==1.47.* +grpcio-tools==1.60.0 grpclib==0.4.4 prettytable==3.5.0 prometheus-client==0.13.0 -protobuf==3.20.* +# protobuf==3.20.* +protobuf==4.21.6 pytest==6.2.5 pytest-benchmark==3.4.1 python-dateutil==2.8.2 diff --git a/scripts/run_tests_locally-telemetry-gnmi.sh b/scripts/run_tests_locally-telemetry-gnmi.sh index bff0317f6..90e9303e4 100755 --- a/scripts/run_tests_locally-telemetry-gnmi.sh +++ b/scripts/run_tests_locally-telemetry-gnmi.sh @@ -25,5 +25,6 @@ cd $PROJECTDIR/src RCFILE=$PROJECTDIR/coverage/.coveragerc python3 -m pytest --log-level=info --log-cli-level=info --verbose \ - telemetry/backend/tests/gnmi_openconfig/test_gnmi_openconfig_collector.py - + telemetry/backend/tests/gnmi_oc/test_GnmiOpenConfigCollector.py + # telemetry/backend/tests/gnmi_openconfig/test_gnmi_openconfig_collector.py +# /home/cttc/waleed/tfs-ctrl/src/ \ No newline at end of file diff --git a/src/telemetry/backend/service/collectors/gnmi_oc/GnmiOpenConfigCollector.py b/src/telemetry/backend/service/collectors/gnmi_oc/GnmiOpenConfigCollector.py new file mode 100644 index 000000000..b3f317654 --- /dev/null +++ b/src/telemetry/backend/service/collectors/gnmi_oc/GnmiOpenConfigCollector.py @@ -0,0 +1,164 @@ +import pytz +from datetime import datetime, timedelta +import logging +from typing import Dict, Optional, Tuple, List, Callable, Union, Any, Iterator +from urllib import response + +from telemetry.backend.service.collector_api._Collector import _Collector +from .PathMapper import PathMapper +from .SubscriptionNew import Subscription +from .KPI import KPI + +import queue +from pygnmi.client import gNMIclient + +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s %(levelname)8s [%(name)s - %(funcName)s()]: %(message)s", +) + +class GNMIOpenConfigCollector(_Collector): + """ + GNMI OpenConfig Collector + ========================= + Lightweight wrapper around *pygnmi* with subscribe / get / unsubscribe helpers. + """ + def __init__(self, + username: str = 'admin', password: str = 'admin', insecure: bool = True, + address: str = '', port: int = -1, **setting + ) -> None: + + super().__init__('gNMI_openconfig_collector', address, port, **setting) + self._subscriptions : Dict[str, Subscription] = {} + self.username = username + self.password = password + self.insecure = insecure + + self.connected = False # To track connection state + self.client: Optional[gNMIclient] = None + self._output_queue = queue.Queue() # Queue for telemetry updates + + self.logger = logging.getLogger(__name__) + self.logger.debug("GNMICollector instantiated.") + + + def Connect(self) -> None: + """ + Connect to the gNMI target device. + """ + if not self.connected: + self.client = gNMIclient( + target=(self.address, self.port), + username=self.username, + password=self.password, + insecure=self.insecure + ) + self.client.connect() # type: ignore + self.connected = True + self.logger.info("Connected to gNMI target %s:%s", self.address, self.port) + else: + self.logger.warning("Already connected to gNMI target %s:%s", self.address, self.port) + + def Disconnect(self) -> None: + """ + Disconnect from the gNMI target device. + """ + if self.connected and self.client: + self.client.close() + self.connected = False + self.logger.info("Disconnected from gNMI target %s:%s", self.address, self.port) + else: + self.logger.warning("Not connected to any gNMI target.") + + def SubscribeState(self, subscriptions: List[Tuple[str, dict, float, float]] + ) -> List[Union[bool, Exception]]: + response = [] + for subscription in subscriptions: + try: + # Validate subscription format + if len(subscription) != 4: + raise ValueError(f"Expected 4 elements, got {len(subscription)}") + sub_id, sub_endpoint, duration, interval = subscription + + if not isinstance(sub_endpoint, dict): + raise TypeError("Endpoint must be a dictionary.") + if sub_endpoint.get('endpoint') is None: + raise KeyError("Endpoint dictionary must contain 'endpoint' key.") + if sub_endpoint.get('kpi') is None: + raise KeyError("Endpoint dictionary must contain 'kpi' key.") + if sub_endpoint.get('resource') is None: + raise KeyError("Endpoint dictionary must contain 'resource' key.") + # Convert KPI Id into name + + # kpi_name = KPI.get_kpi_name_by_value(sub_endpoint['kpi']) + paths = PathMapper.build( + endpoint=sub_endpoint['endpoint'], + kpi=sub_endpoint['kpi'], + resource=sub_endpoint['resource'], + ) + + self._subscriptions[sub_id] = Subscription( + sub_id = sub_id, + gnmi_client = self.client, # type: ignore + path_list = paths, # <- list of paths + metric_queue = self._output_queue, + mode = 'stream', # Default mode + sample_interval_ns = int(interval * 1_000_000_000), # Convert seconds to nanoseconds + heartbeat_interval_ns = int(duration * 1_000_000_000), # Convert seconds to nanoseconds + encoding = 'json_ietf', # Default encoding + ) + + self.logger.info("Subscribing to %s with job_id %s ...", sub_endpoint, sub_id) + response.append(True) + except: + self.logger.exception("Invalid subscription format: %s", subscription) + response.append(False) + return response + + def UnsubscribeState(self, resource_key: str) -> bool: + """Stop the given subscription.""" + sub = self._subscriptions.pop(resource_key, None) + if not sub: + self.logger.error("Attempt to unsubscribe unknown id=%s", resource_key) + # raise KeyError(f"Unknown subscription id '{resource_key}'.") + return False + try: sub.stop() + except: + self.logger.exception("Error stopping subscription %s. ", resource_key) + return False + self.logger.info("Unsubscribed from state: %s", resource_key) + return True + + def GetState(self, duration : float, blocking : bool = True, terminate: Optional[queue.Queue] = None + ) -> Iterator[Tuple[float, str, Any]]: + """ + Pull a single telemetry update from the queue. + Returns an iterator that yields (timestamp, resource_key, data). + """ + logging.debug("GetState called with duration=%s, blocking=%s", duration, blocking) + start_time = datetime.now(pytz.utc) + while True: + logging.debug("GetState loop started at %s", start_time) + try: + if terminate and not terminate.empty(): + self.logger.info("Termination signal received, stopping GetState") + break + + elapsed_time = (datetime.now(pytz.utc) - start_time).total_seconds() + if elapsed_time >= duration: + self.logger.info("Duration expired, stopping GetState") + break + + sample = self._output_queue.get(block=blocking, timeout=1 if blocking else 0.1) + self.logger.info(f"Retrieved state sample: {sample}") + yield sample + except queue.Empty: + if not blocking: + self.logger.info("No more samples in queue, exiting GetState") + return None + # sample = self._output_queue.get(block=blocking, timeout=duration if blocking else 0.1) + # yield sample + + # return self._output_queue.get(timeout=duration) if blocking else self._output_queue.get_nowait() + # Note: This method will block until an item is available or the timeout is reached. + \ No newline at end of file diff --git a/src/telemetry/backend/service/collectors/gnmi_oc/KPI.py b/src/telemetry/backend/service/collectors/gnmi_oc/KPI.py new file mode 100644 index 000000000..e74682b33 --- /dev/null +++ b/src/telemetry/backend/service/collectors/gnmi_oc/KPI.py @@ -0,0 +1,32 @@ +from enum import IntEnum, unique + +@unique +class KPI(IntEnum): # TODO: verify KPI names and codes with KPI proto file. (How many TFS supports) + """Generic KPI codes that map to interface statistics.""" + PACKETS_TRANSMITTED = 101 + PACKETS_RECEIVED = 102 + PACKETS_DROPPED = 103 + BYTES_TRANSMITTED = 201 + BYTES_RECEIVED = 202 + BYTES_DROPPED = 203 + INBAND_POWER = 301 + # TODO: Add more KPIs as needed, + + # @staticmethod + # def get_kpi_name_by_value(kpi_value): + # """ + # Returns the KPI name for a given enum value. + + # Parameters: + # kpi_value (int): The KPI enum value. + + # Returns: + # str: The name of the KPI enum member. + + # Raises: + # ValueError: If the KPI value is not found. + # """ + # for kpi in KPI: + # if kpi.value == kpi_value: + # return kpi.name + # raise ValueError(f"Invalid KPI enum value: {kpi_value}") diff --git a/src/telemetry/backend/service/collectors/gnmi_oc/PathMapper.py b/src/telemetry/backend/service/collectors/gnmi_oc/PathMapper.py new file mode 100644 index 000000000..9d21ea995 --- /dev/null +++ b/src/telemetry/backend/service/collectors/gnmi_oc/PathMapper.py @@ -0,0 +1,101 @@ +import logging +from typing import Dict, List, Optional, Union +from .KPI import KPI + +logger = logging.getLogger(__name__) +# logger.setLevel(logging.INFO) + + +class PathMapper: + """ + Generate **multiple candidate paths** for an interface KPI. + + The mapper is deliberately generic: it knows only + * the leaf names commonly used across OpenConfig flavours, and + * a few prefix variants ('.../state/counters', '.../state'). + + Subscription logic will try each candidate until one succeeds + against the target device. + """ + + # --------------------------------------------------------------# + # Leaf names that can satisfy each KPI # + # --------------------------------------------------------------# + _LEAF_CANDIDATES: Dict[KPI, List[str]] = { + # There are multiple leaf names that can satisfy each KPI but they can be added or removed + # in the future. The list is not exhaustive, but it covers the most common cases + # across OpenConfig implementations. The collector will try each until one succeeds. + # ---- packets --------------------------------------------------- + KPI.PACKETS_TRANSMITTED: [ + "out-pkts", "out-unicast-pkts", "tx-pkts", "packets-output" + ], + KPI.PACKETS_RECEIVED: [ + "in-pkts", "in-unicast-pkts", "rx-pkts", "packets-input" + ], + KPI.PACKETS_DROPPED: [ + "in-discards", "out-discards", "packets-drop" + ], + + # ---- bytes ----------------------------------------------------- + KPI.BYTES_TRANSMITTED: [ + "out-octets", "tx-octets", "bytes-output" + ], + KPI.BYTES_RECEIVED: [ + "in-octets", "rx-octets", "bytes-input" + ], + KPI.BYTES_DROPPED: [ + "in-octets-discarded", "out-octets-discarded", "bytes-drop" + ], + + # ---- power (TODO: List time need to be verified) ------------- + # Note: Inband power is not a standard leaf in OpenConfig, but + # it is included here for completeness. The actual leaf names + # may vary by implementation. + KPI.INBAND_POWER: [ + "inband-power", "inband-power-state" + ], + } + + # --------------------------------------------------------------# + # Prefix variants (no explicit origin) # + # --------------------------------------------------------------# + # More leaf prefixes can be added here if needed. + # The collector will try each prefix with the leaf names. + _PREFIXES = [ + 'interfaces/interface[name={endpoint}]/state/counters/{leaf}', + # 'interfaces/interface[name="{endpoint}"]/state/{leaf}', + ] + # --------------------------------------------------------------# + # Public helper # + # --------------------------------------------------------------# + @classmethod + def build(cls, + endpoint: str, kpi: Union[KPI, int], resource: Optional[str] = None + ) -> List[str]: + """ + Return **a list** of path strings. + + :param endpoint: Interface name, e.g. 'Ethernet0' + :param kpi: KPI enum + :param resource: Interface parameter + """ + try: + kpi_enum = KPI(kpi) + except ValueError as exc: + raise ValueError(f"Unsupported KPI code: {kpi}") from exc + + leaves = cls._LEAF_CANDIDATES.get(kpi_enum, []) + if not leaves: + raise ValueError(f"No leaf candidates for KPI {kpi_enum}") + + paths: List[str] = [] + for leaf in leaves: + if resource == "interface": + for prefix in cls._PREFIXES: + paths.append(prefix.format(endpoint=endpoint, leaf=leaf)) + else: + raise ValueError(f"Unsupported resource: {resource}") + + logger.debug("Built %d candidate path(s) for %s on %s", + len(paths), kpi_enum.name, endpoint) + return paths diff --git a/src/telemetry/backend/service/collectors/gnmi_oc/SubscriptionNew.py b/src/telemetry/backend/service/collectors/gnmi_oc/SubscriptionNew.py new file mode 100644 index 000000000..59087f433 --- /dev/null +++ b/src/telemetry/backend/service/collectors/gnmi_oc/SubscriptionNew.py @@ -0,0 +1,144 @@ +import threading +import logging +from queue import Queue +from typing import Callable, Tuple, Optional, List +from google.protobuf.json_format import MessageToDict + +import grpc # pygnmi uses grpc underneath +from pygnmi.client import gNMIclient # type: ignore + +logger = logging.getLogger(__name__) +# logger.setLevel(logging.INFO) + + +class Subscription: + """ + Handles a gNMI *Subscribe* session. + + It receives a **list of candidate paths**; if the target rejects one + (INVALID_ARGUMENT / unknown path), the thread automatically tries the + next path until it works or the list is exhausted. + """ + + def __init__( + self, + sub_id: str, + gnmi_client: gNMIclient, + path_list: List[str], + metric_queue: Queue, + mode: str = "stream", + sample_interval_ns: int = 10_000_000_000, + heartbeat_interval_ns: int | None = None, # ↠NEW + encoding: str = "json_ietf", + on_update: Optional[Callable[[dict], None]] = None, + ) -> None: + + self.sub_id = sub_id + self.gnmi_client = gnmi_client + self._queue: Queue = metric_queue + self._stop_event = threading.Event() + + self._thread = threading.Thread( + target = self._run, + args = ( + path_list, mode, + sample_interval_ns, heartbeat_interval_ns, encoding, on_update, + ), + name=f"gnmi-sub-{sub_id[:8]}", + daemon=True, + ) + self._thread.start() + logger.info("Started subscription %s",sub_id) + + # --------------------------------------------------------------# + # Public helpers # + # --------------------------------------------------------------# + def get(self, timeout: Optional[float] = None) -> dict: + return self._queue.get(timeout=timeout) + + def stop(self) -> None: + self._stop_event.set() + self._thread.join(2) + logger.info("Stopped subscription %s", self.sub_id) + + # --------------------------------------------------------------# + # Internal loop # + # --------------------------------------------------------------# + def _run( + self, + path_list: List[str], + mode: str, + sample_interval_ns: int, + heartbeat_interval_ns: int | None, + encoding: str, + on_update: Optional[Callable[[dict], None]], + ) -> None: # pragma: no cover + """ + Try each candidate path until the Subscribe RPC succeeds. + + * Top level mode: STREAM / ONCE / POLL (here we always stream) + * Per entry mode: SAMPLE / ON_CHANGE + """ + # --- pick the correct gNMI enum strings ------------------------- + top_mode = "stream" # explicitly stream mode + entry_mode = mode.lower() + + for path in path_list: + if self._stop_event.is_set(): + break + + entry: dict = {"path": path} + + if entry_mode == "sample": + entry["mode"] = "sample" + entry["sample_interval"] = sample_interval_ns + elif entry_mode == "on_change": + entry["mode"] = "on_change" + if heartbeat_interval_ns: + entry["heartbeat_interval"] = heartbeat_interval_ns + else: + entry["mode"] = "target_defined" + + request = { + "subscription": [entry], + "mode": top_mode, + "encoding": encoding, + } + logger.debug("Subscription %s to be requested: %s", self.sub_id, request) + try: + logger.debug("Sub %s attempting path %s", self.sub_id, path) + for stream in self.gnmi_client.subscribe(request): + msg_dict = MessageToDict(stream) + # logger.debug("Stream: %s", msg_dict) + + # Process any update data + if msg_dict.get('update'): # 'update' in msg_dict: + logger.debug("Sub %s got update data", self.sub_id) + if on_update: + on_update(msg_dict) + else: + self._queue.put(msg_dict) + # logger.debug("The update added in queue → %s", msg_dict) + # Put a dummy update if syncResponse is received to prevent timeout + elif msg_dict.get('syncResponse'): # 'syncResponse' in msg_dict: + logger.debug("Sub %s received sync response", self.sub_id) + # Optional: put a notification about the sync + if not on_update: + self._queue.put({"type": "sync_response", "value": True}) + else: + logger.warning("Sub %s received unknown message: %s", self.sub_id, msg_dict) + + except grpc.RpcError as err: + if err.code() == grpc.StatusCode.INVALID_ARGUMENT: + logger.warning("Path '%s' rejected (%s) -- trying next", + path, err.details()) + continue + logger.exception("Subscription %s hit gRPC error: %s", + self.sub_id, err) + break + + except Exception as exc: # pylint: disable=broad-except + logger.exception("Subscription %s failed: %s", self.sub_id, exc) + break + + logger.info("Subscription thread %s terminating", self.sub_id) diff --git a/src/telemetry/backend/service/collectors/gnmi_oc/__init__.py b/src/telemetry/backend/service/collectors/gnmi_oc/__init__.py new file mode 100644 index 000000000..023830645 --- /dev/null +++ b/src/telemetry/backend/service/collectors/gnmi_oc/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/telemetry/backend/tests/gnmi_oc/__init__.py b/src/telemetry/backend/tests/gnmi_oc/__init__.py new file mode 100644 index 000000000..023830645 --- /dev/null +++ b/src/telemetry/backend/tests/gnmi_oc/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/telemetry/backend/tests/gnmi_oc/messages.py b/src/telemetry/backend/tests/gnmi_oc/messages.py new file mode 100644 index 000000000..19f790ced --- /dev/null +++ b/src/telemetry/backend/tests/gnmi_oc/messages.py @@ -0,0 +1,47 @@ +from src.telemetry.backend.service.collectors.gnmi_oc.KPI import KPI + +# Test device connection parameters +devices = { + 'device1': { + 'host': '10.1.1.86', + 'port': '6030', + 'username': 'ocnos', + 'password': 'ocnos', + 'insecure': True, + }, + 'device2': { + 'host': '10.1.1.87', + 'port': '6030', + 'username': 'ocnos', + 'password': 'ocnos', + 'insecure': True, + }, + 'device3': { + 'host': '172.20.20.101', + 'port': '6030', + 'username': 'admin', + 'password': 'admin', + 'insecure': True, + }, +} + +def creat_basic_sub_request_parameters( + resource: str = 'interface', + endpoint: str = 'Management0', # 'Ethernet1', + kpi: KPI = KPI.PACKETS_RECEIVED, # It should be KPI Id not name? Need to be replaced with KPI id. +) -> dict: + + device = devices['device3'] + return { + 'target' : (device['host'], device['port']), + 'username' : device['username'], + 'password' : device['password'], + 'connect_timeout' : 15, + 'insecure' : device['insecure'], + 'mode' : 'on_change', # Subscription internal mode posibly: on_change, poll, sample + 'sample_interval_ns': '3s', + 'sample_interval' : '10s', + 'kpi' : kpi, + 'resource' : resource, + 'endpoint' : endpoint, + } diff --git a/src/telemetry/backend/tests/gnmi_oc/test_GnmiOpenConfigCollector.py b/src/telemetry/backend/tests/gnmi_oc/test_GnmiOpenConfigCollector.py new file mode 100644 index 000000000..5f94e6882 --- /dev/null +++ b/src/telemetry/backend/tests/gnmi_oc/test_GnmiOpenConfigCollector.py @@ -0,0 +1,118 @@ +import logging +import time +import pytest +from telemetry.backend.service.collectors.gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector +from .messages import creat_basic_sub_request_parameters + +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s %(levelname)8s [%(name)s - %(funcName)s()]: %(message)s", +) +logger = logging.getLogger(__name__) + + +@pytest.fixture +def sub_parameters(): + """Fixture to provide subscription parameters.""" + return creat_basic_sub_request_parameters() + + +@pytest.fixture +def collector(sub_parameters): + """Fixture to create and connect GNMI collector.""" + collector = GNMIOpenConfigCollector( + username = sub_parameters['username'], + password = sub_parameters['password'], + insecure = sub_parameters['insecure'], + address = sub_parameters['target'][0], + port = sub_parameters['target'][1], + ) + collector.Connect() + yield collector + collector.Disconnect() + + +@pytest.fixture +def subscription_data(sub_parameters): + """Fixture to provide subscription data.""" + # It should return a list of tuples with subscription parameters. + return [ + ( + "x123", + { + "kpi" : sub_parameters['kpi'], + "endpoint" : sub_parameters['endpoint'], + "resource" : sub_parameters['resource'], + }, + float(10.0), + float(5.0), + ), + ] + + +def test_collector_connection(collector): + """Test collector connection.""" + logger.info("----- Testing GNMI OpenConfig Collector Connection -----") + assert collector.connected is True + logger.debug("Collector connected: %s", collector.connected) + + +def test_subscription_state(collector, subscription_data): + """Test state subscription.""" + logger.info("----- Testing State Subscription -----") + response = collector.SubscribeState(subscription_data) + logger.info("Subscription started: %s", subscription_data) + assert all(response) and isinstance(response, list) + + +def test_get_state_updates(collector, subscription_data): + """Test getting state updates.""" + logger.info("----- Testing State Updates -----") + collector.SubscribeState(subscription_data) + + logger.info("Requesting state updates for 5 seconds ...") + updates_received = [] + for samples in collector.GetState(duration=5.0, blocking=True): + logger.info("Received state update: %s", samples) + updates_received.append(samples) + + assert len(updates_received) > 0 + + +def test_unsubscribe_state(collector, subscription_data): + """Test unsubscribing from state.""" + logger.info("----- Testing Unsubscribe -----") + collector.SubscribeState(subscription_data) + + time.sleep(2) # Wait briefly for subscription to be active + + response = collector.UnsubscribeState("x123") + logger.info("Unsubscribed from state: %s", subscription_data) + assert response is True + +def test_full_workflow(collector, subscription_data): + """Test complete workflow: subscribe, get updates, unsubscribe.""" + logger.info("----- Testing Full Workflow -----") + + # Subscribe + response1 = collector.SubscribeState(subscription_data) + logger.info("Subscription started: %s", subscription_data) + assert all(response1) and isinstance(response1, list) + + # Get updates + logger.info("Requesting state updates for 5 seconds ...") + updates_received = [] + for samples in collector.GetState(duration=5.0, blocking=True): + logger.info("Received state update: %s", samples) + updates_received.append(samples) + assert len(updates_received) > 0 + # Wait for additional updates + logger.info("Waiting for updates for 5 seconds...") + time.sleep(5) + + # Unsubscribe + response2 = collector.UnsubscribeState("x123") + logger.info("Unsubscribed from state: %s", subscription_data) + assert response2 is True + + logger.info("----- Workflow test completed -----") -- GitLab