Commit 421eda13 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Refactor imports and enhance gNMI collector functionality

- Updated import paths for _Collector in INTCollector.py.
- Added missing scapy dependency in requirements.in.
- Changed return types to Optional in HelperMethods.py.
- Improved disconnect handling in GnmiOpenConfigCollector.py.
- Added TOTAL_POWER KPI in KPI.py.
- Enhanced PathMapper.py for wavelength router support.
- Refined SubscriptionNew.py with graceful stop and response parsing.
- Updated test cases in messages.py and test_unit_GnmiOpenConfigCollector.py for better logging and parameter handling.
parent 87cb50ce
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
from datetime import datetime
from telemetry.backend.collector_api._Collector import _Collector
from telemetry.backend.service.collector_api._Collector import _Collector

from scapy.all import *
import struct
+1 −0
Original line number Diff line number Diff line
@@ -20,3 +20,4 @@ kafka-python==2.0.6
numpy==2.0.1
pygnmi==0.8.14
pytz>=2025.2
scapy==2.6.1    # TODO: UBI need to confirm the version (This depencdency was missing)
+4 −3
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@

import uuid
import logging
from typing import Optional
from .collector_api._Collector               import _Collector
from .collector_api.DriverInstanceCache      import get_driver
from common.proto.kpi_manager_pb2            import KpiId
@@ -24,7 +25,7 @@ LOGGER = logging.getLogger(__name__)

def get_subscription_parameters(
        kpi_id : str, kpi_manager_client, context_client, duration, interval
        ) -> list[tuple] | None:
        ) -> Optional[list[tuple]]:
    """
    Method to get subscription parameters based on KPI ID.
    Returns a list of tuples with subscription parameters.
@@ -95,12 +96,12 @@ def get_subscription_parameters(


def get_collector_by_kpi_id(kpi_id: str, kpi_manager_client, context_client, driver_instance_cache
                            ) -> _Collector | None:
                            ) -> Optional[_Collector]:
    """
    Method to get a collector instance based on KPI ID.
    Preconditions:
        - A KPI Descriptor must be added in KPI DB with correct device_id.
        - The device must be available in the context.
        - The device must be available in the context DB.
    Returns:
        - Collector instance if found, otherwise None.
    Raises:
+32 −12
Original line number Diff line number Diff line
@@ -75,12 +75,31 @@ class GNMIOpenConfigCollector(_Collector):
    def Disconnect(self) -> bool:
        """
        Disconnect from the gNMI target device.
        Stops all active subscriptions before closing the connection.
        """
        # Stop all active subscriptions first
        if self._subscriptions:
            self.logger.info("Stopping %d active subscription(s) before disconnect...", 
                           len(self._subscriptions))
            # Create a list of subscription IDs to avoid dictionary size change during iteration
            sub_ids = list(self._subscriptions.keys())
            for sub_id in sub_ids:
                try:
                    self.UnsubscribeState(sub_id)
                except Exception as exc:
                    self.logger.warning("Error stopping subscription %s during disconnect: %s", 
                                      sub_id, exc)
        
        if self.connected and self.client:
            try:
                self.client.close()
                self.connected = False
                self.logger.info("Disconnected from gNMI target %s:%s", self.address, self.port)
                return True
            except Exception as exc:
                self.logger.error("Error during disconnect: %s", exc)
                self.connected = False  # Mark as disconnected even if close fails
                return False
        else:
            self.logger.warning("Not connected to any gNMI target.")
            return True
@@ -129,18 +148,19 @@ class GNMIOpenConfigCollector(_Collector):
        return response

    def UnsubscribeState(self, resource_key: str) -> bool:
        """Stop the given subscription."""
        """Stop the given subscription gracefully."""
        sub = self._subscriptions.pop(resource_key, None)
        if not sub:
            self.logger.error("Attempt to unsubscribe unknown id=%s", resource_key)
            # raise KeyError(f"Unknown subscription id '{resource_key}'.")
            return False
        try: sub.stop()
        except:
            self.logger.exception("Error stopping subscription %s. ", resource_key)
            self.logger.warning("Attempt to unsubscribe unknown id=%s", resource_key)
            return False
        
        try:
            sub.stop()
            self.logger.info("Unsubscribed from state: %s", resource_key)
            return True
        except Exception as exc:
            self.logger.error("Error stopping subscription %s: %s", resource_key, exc)
            return False

    def GetState(self, duration : float, blocking : bool = True, terminate: Optional[queue.Queue] = None
                 ) -> Iterator[Tuple[float, str, Any]]:
+1 −0
Original line number Diff line number Diff line
@@ -25,4 +25,5 @@ class KPI(IntEnum): # TODO: verify KPI names and codes with KPI proto fi
    BYTES_RECEIVED      = 202
    BYTES_DROPPED       = 203
    INBAND_POWER        = 301
    TOTAL_POWER         = 302
    # TODO: Add more KPIs as needed,
Loading