Commit faef6e3c authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Implement gNMI OpenConfig Collector with subscription management and telemetry updates

- Added main collector file
- Added helper files
- Added pytest file
parent b0bf67e4
Loading
Loading
Loading
Loading
+6 −3
Original line number Diff line number Diff line
@@ -13,14 +13,17 @@
# limitations under the License.

coverage==6.3
grpcio==1.47.*
# grpcio==1.47.*
grpcio==1.60.0
grpcio-health-checking==1.47.*
grpcio-reflection==1.47.*
grpcio-tools==1.47.*
# grpcio-tools==1.47.*
grpcio-tools==1.60.0
grpclib==0.4.4
prettytable==3.5.0
prometheus-client==0.13.0
protobuf==3.20.*
# protobuf==3.20.*
protobuf==4.21.6
pytest==6.2.5
pytest-benchmark==3.4.1
python-dateutil==2.8.2
+3 −2
Original line number Diff line number Diff line
@@ -25,5 +25,6 @@ cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc

python3 -m pytest --log-level=info --log-cli-level=info --verbose \
    telemetry/backend/tests/gnmi_openconfig/test_gnmi_openconfig_collector.py
    telemetry/backend/tests/gnmi_oc/test_GnmiOpenConfigCollector.py
    # telemetry/backend/tests/gnmi_openconfig/test_gnmi_openconfig_collector.py
# /home/cttc/waleed/tfs-ctrl/src/
 No newline at end of file
+164 −0
Original line number Diff line number Diff line
import pytz
from datetime import datetime, timedelta
import logging
from typing import Dict, Optional, Tuple, List, Callable, Union, Any, Iterator
from urllib import response

from telemetry.backend.service.collector_api._Collector import _Collector
from .PathMapper import PathMapper
from .SubscriptionNew import Subscription
from .KPI import KPI

import queue
from pygnmi.client import gNMIclient

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s %(levelname)8s [%(name)s - %(funcName)s()]: %(message)s",
)

class GNMIOpenConfigCollector(_Collector):
    """
    GNMI OpenConfig Collector
    =========================
    Lightweight wrapper around *pygnmi* with subscribe / get / unsubscribe helpers.
    """
    def __init__(self,
                 username: str = 'admin', password: str = 'admin', insecure: bool = True,
                 address: str = '', port: int = -1, **setting 
                 ) -> None:
        
        super().__init__('gNMI_openconfig_collector', address, port, **setting)
        self._subscriptions : Dict[str, Subscription] = {}
        self.username = username
        self.password = password
        self.insecure = insecure

        self.connected = False          # To track connection state
        self.client: Optional[gNMIclient] = None
        self._output_queue = queue.Queue()  # Queue for telemetry updates

        self.logger    = logging.getLogger(__name__)
        self.logger.debug("GNMICollector instantiated.")


    def Connect(self) -> None:
        """
        Connect to the gNMI target device.
        """
        if not self.connected:
            self.client = gNMIclient(
                target=(self.address, self.port),
                username=self.username,
                password=self.password,
                insecure=self.insecure
            )
            self.client.connect()           # type: ignore
            self.connected = True
            self.logger.info("Connected to gNMI target %s:%s", self.address, self.port)
        else:
            self.logger.warning("Already connected to gNMI target %s:%s", self.address, self.port)

    def Disconnect(self) -> None:
        """
        Disconnect from the gNMI target device.
        """
        if self.connected and self.client:
            self.client.close()
            self.connected = False
            self.logger.info("Disconnected from gNMI target %s:%s", self.address, self.port)
        else:
            self.logger.warning("Not connected to any gNMI target.")

    def SubscribeState(self, subscriptions: List[Tuple[str, dict, float, float]]
                  ) -> List[Union[bool, Exception]]:
        response = []
        for subscription in subscriptions:
            try:
                # Validate subscription format
                if len(subscription) != 4:
                    raise ValueError(f"Expected 4 elements, got {len(subscription)}")
                sub_id, sub_endpoint, duration, interval = subscription

                if not isinstance(sub_endpoint, dict):
                    raise TypeError("Endpoint must be a dictionary.")
                if sub_endpoint.get('endpoint') is None:
                    raise KeyError("Endpoint dictionary must contain 'endpoint' key.")
                if sub_endpoint.get('kpi') is None:
                    raise KeyError("Endpoint dictionary must contain 'kpi' key.")
                if sub_endpoint.get('resource') is None:
                    raise KeyError("Endpoint dictionary must contain 'resource' key.")
                # Convert KPI Id into name
                
                # kpi_name = KPI.get_kpi_name_by_value(sub_endpoint['kpi'])
                paths = PathMapper.build(
                                endpoint=sub_endpoint['endpoint'],
                                kpi=sub_endpoint['kpi'],
                                resource=sub_endpoint['resource'],
                )

                self._subscriptions[sub_id] = Subscription(
                    sub_id                = sub_id,
                    gnmi_client           = self.client,                   # type: ignore
                    path_list             = paths,                         # <- list of paths
                    metric_queue          = self._output_queue,
                    mode                  = 'stream',                      # Default mode
                    sample_interval_ns    = int(interval * 1_000_000_000), # Convert seconds to nanoseconds
                    heartbeat_interval_ns = int(duration * 1_000_000_000), # Convert seconds to nanoseconds
                    encoding              = 'json_ietf',                   # Default encoding
                )

                self.logger.info("Subscribing to %s with job_id %s ...", sub_endpoint, sub_id)
                response.append(True)
            except:
                self.logger.exception("Invalid subscription format: %s", subscription)
                response.append(False)
        return response

    def UnsubscribeState(self, resource_key: str) -> bool:
        """Stop the given subscription."""
        sub = self._subscriptions.pop(resource_key, None)
        if not sub:
            self.logger.error("Attempt to unsubscribe unknown id=%s", resource_key)
            # raise KeyError(f"Unknown subscription id '{resource_key}'.")
            return False
        try: sub.stop()
        except:
            self.logger.exception("Error stopping subscription %s. ", resource_key)
            return False
        self.logger.info("Unsubscribed from state: %s", resource_key)
        return True

    def GetState(self, duration : float, blocking : bool = True, terminate: Optional[queue.Queue] = None
                 ) -> Iterator[Tuple[float, str, Any]]:
        """
        Pull a single telemetry update from the queue.
        Returns an iterator that yields (timestamp, resource_key, data).
        """
        logging.debug("GetState called with duration=%s, blocking=%s", duration, blocking)
        start_time = datetime.now(pytz.utc)
        while True:
            logging.debug("GetState loop started at %s", start_time)
            try:
                if terminate and not terminate.empty():
                    self.logger.info("Termination signal received, stopping GetState")
                    break

                elapsed_time = (datetime.now(pytz.utc) - start_time).total_seconds()
                if elapsed_time >= duration:
                    self.logger.info("Duration expired, stopping GetState")
                    break

                sample = self._output_queue.get(block=blocking, timeout=1 if blocking else 0.1)
                self.logger.info(f"Retrieved state sample: {sample}")
                yield sample
            except queue.Empty:
                if not blocking:
                    self.logger.info("No more samples in queue, exiting GetState")
                    return None
        # sample = self._output_queue.get(block=blocking, timeout=duration if blocking else 0.1)
        # yield sample

        # return self._output_queue.get(timeout=duration) if blocking else self._output_queue.get_nowait()
        # Note: This method will block until an item is available or the timeout is reached.
 
 No newline at end of file
+32 −0
Original line number Diff line number Diff line
from enum import IntEnum, unique

@unique
class KPI(IntEnum):         # TODO: verify KPI names and codes with KPI proto file. (How many TFS supports)
    """Generic KPI codes that map to interface statistics."""
    PACKETS_TRANSMITTED = 101
    PACKETS_RECEIVED    = 102
    PACKETS_DROPPED     = 103
    BYTES_TRANSMITTED   = 201
    BYTES_RECEIVED      = 202
    BYTES_DROPPED       = 203
    INBAND_POWER        = 301
    # TODO: Add more KPIs as needed,

    # @staticmethod
    # def get_kpi_name_by_value(kpi_value):
    #     """
    #     Returns the KPI name for a given enum value.
        
    #     Parameters:
    #     kpi_value (int): The KPI enum value.
        
    #     Returns:
    #     str: The name of the KPI enum member.
        
    #     Raises:
    #     ValueError: If the KPI value is not found.
    #     """
    #     for kpi in KPI:
    #         if kpi.value == kpi_value:
    #             return kpi.name
    #     raise ValueError(f"Invalid KPI enum value: {kpi_value}")
+101 −0
Original line number Diff line number Diff line
import logging
from typing import Dict, List, Optional, Union
from .KPI import KPI

logger = logging.getLogger(__name__)
# logger.setLevel(logging.INFO)


class PathMapper:
    """
    Generate **multiple candidate paths** for an interface KPI.

    The mapper is deliberately generic: it knows only
    * the leaf names commonly used across OpenConfig flavours, and
    * a few prefix variants ('.../state/counters', '.../state').

    Subscription logic will try each candidate until one succeeds
    against the target device.
    """

    # --------------------------------------------------------------#
    #  Leaf names that can satisfy each KPI                         #
    # --------------------------------------------------------------#
    _LEAF_CANDIDATES: Dict[KPI, List[str]] = {
        # There are multiple leaf names that can satisfy each KPI but they can be added or removed
        # in the future. The list is not exhaustive, but it covers the most common cases
        # across OpenConfig implementations. The collector will try each until one succeeds.
        # ---- packets ---------------------------------------------------
        KPI.PACKETS_TRANSMITTED: [
            "out-pkts", "out-unicast-pkts", "tx-pkts", "packets-output"
        ],
        KPI.PACKETS_RECEIVED: [
            "in-pkts", "in-unicast-pkts", "rx-pkts", "packets-input"
        ],
        KPI.PACKETS_DROPPED: [
            "in-discards", "out-discards", "packets-drop"
        ],

        # ---- bytes -----------------------------------------------------
        KPI.BYTES_TRANSMITTED: [
            "out-octets", "tx-octets", "bytes-output"
        ],
        KPI.BYTES_RECEIVED: [
            "in-octets", "rx-octets", "bytes-input"
        ],
        KPI.BYTES_DROPPED: [
            "in-octets-discarded", "out-octets-discarded", "bytes-drop"
        ],

        # ---- power (TODO: List time need to be verified) -------------
        # Note: Inband power is not a standard leaf in OpenConfig, but
        # it is included here for completeness. The actual leaf names
        # may vary by implementation.
        KPI.INBAND_POWER: [
            "inband-power", "inband-power-state"
        ],
    }

    # --------------------------------------------------------------#
    #  Prefix variants (no explicit origin)                         #
    # --------------------------------------------------------------#
    # More leaf prefixes can be added here if needed. 
    # The collector will try each prefix with the leaf names.
    _PREFIXES = [
        'interfaces/interface[name={endpoint}]/state/counters/{leaf}',
        # 'interfaces/interface[name="{endpoint}"]/state/{leaf}',
    ]
    # --------------------------------------------------------------#
    #  Public helper                                                #
    # --------------------------------------------------------------#
    @classmethod
    def build(cls,
              endpoint: str, kpi: Union[KPI, int], resource: Optional[str] = None
              ) -> List[str]:
        """
        Return **a list** of path strings.

        :param endpoint:  Interface name, e.g. 'Ethernet0'
        :param kpi:       KPI enum
        :param resource:  Interface parameter
        """
        try:
            kpi_enum = KPI(kpi)
        except ValueError as exc:
            raise ValueError(f"Unsupported KPI code: {kpi}") from exc

        leaves = cls._LEAF_CANDIDATES.get(kpi_enum, [])
        if not leaves:
            raise ValueError(f"No leaf candidates for KPI {kpi_enum}")

        paths: List[str] = []
        for leaf in leaves:
            if resource == "interface":
                for prefix in cls._PREFIXES:
                    paths.append(prefix.format(endpoint=endpoint, leaf=leaf))
            else:
                raise ValueError(f"Unsupported resource: {resource}")

        logger.debug("Built %d candidate path(s) for %s on %s",
                     len(paths), kpi_enum.name, endpoint)
        return paths
Loading