Commit fb90ccbd authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Refactor Telemetry Collector to handle MGON Streaming

- KPI handling and logging in gNMI collector
- Update in SubscriptionNew.py
- KPI type added in test
parent a1260aac
Loading
Loading
Loading
Loading
+5 −6
Original line number Diff line number Diff line
@@ -58,7 +58,7 @@ def get_subscription_parameters(
                         include_components   = False
                         )
    if not device:
        raise Exception(f"KPI ID: {kpi_id} - Device not found for KPI descriptor.")
        raise Exception(f"KPI ID: {kpi_id} - Device not found for KPI descriptor.")     #TODO: Change to TFS NotFoundException 
    endpoints = device.device_endpoints

    # LOGGER.info(f"Device for KPI ID: {kpi_id} - {endpoints}")
@@ -103,9 +103,8 @@ def get_collector_by_kpi_id(kpi_id: str, kpi_manager_client, context_client, dri
        - A KPI Descriptor must be added in KPI DB with correct device_id.
        - The device must be available in the context DB.
    Returns:
        - Collector instance if found, otherwise None.
    Raises:
        - Exception if the KPI ID is not found or the collector cannot be created.
        - Collector instance if found, otherwise raises exception
          if the KPI ID is not found or the collector cannot be created.
    """
    LOGGER.info(f"Getting collector for KPI ID: {kpi_id}")
    kpi_id_obj = KpiId()
@@ -113,7 +112,7 @@ def get_collector_by_kpi_id(kpi_id: str, kpi_manager_client, context_client, dri
    kpi_descriptor     = kpi_manager_client.GetKpiDescriptor(kpi_id_obj)
    # LOGGER.info(f"KPI Descriptor: {kpi_descriptor}")
    if not kpi_descriptor:
        raise Exception(f"KPI ID: {kpi_id} - Descriptor not found.")
        raise Exception(f"KPI ID: {kpi_id} - Descriptor not found.")        #TODO: Change to TFS NotFoundException 
    
    # device_uuid       = kpi_descriptor.device_id.device_uuid.uuid
    device = get_device( context_client       = context_client,
@@ -125,6 +124,6 @@ def get_collector_by_kpi_id(kpi_id: str, kpi_manager_client, context_client, dri
    # Getting device collector (testing)
    collector : _Collector = get_driver(driver_instance_cache, device)
    if collector is None:
        raise Exception(f"KPI ID: {kpi_id} - Collector not found for device {device.device_uuid.uuid}.")
        raise Exception(f"KPI ID: {kpi_id} - Collector not found for device {device.device_uuid.uuid}.")        #TODO: Change to TFS NotFoundException 
    # LOGGER.info(f"Collector for KPI ID: {kpi_id} - {collector.__class__.__name__}")
    return collector
+8 −9
Original line number Diff line number Diff line
@@ -19,7 +19,7 @@ from typing import Dict, Optional, Tuple, List, Union, Any, Iterator
from pygnmi.client import gNMIclient
from telemetry.backend.service.collector_api._Collector import _Collector
from .PathMapper import PathMapper
from .SubscriptionNew import Subscription
from .SubscriptionNew import LOGGER, Subscription

logging.basicConfig(
    level=logging.DEBUG,
@@ -128,18 +128,17 @@ class GNMIOpenConfigCollector(_Collector):
                                kpi      = sub_endpoint['kpi'     ],
                                resource = sub_endpoint['resource'],
                )

                LOGGER.debug("Built %d candidate path(s) for endpoint '%s'", len(paths), sub_endpoint['endpoint'])
                self._subscriptions[sub_id] = Subscription(
                    sub_id                = sub_id,
                    gnmi_client           = self.client,                   # type: ignore
                    path_list             = paths,                         # <- list of paths
                    metric_queue          = self._output_queue,
                    mode                  = 'stream',                      # Default mode
                    mode                  = 'sample',                      # Entry mode: sample/on_change/target_defined
                    sample_interval_ns    = int(interval * 1_000_000_000), # Convert seconds to nanoseconds
                    heartbeat_interval_ns = int(duration * 1_000_000_000), # Convert seconds to nanoseconds
                    encoding              = 'json_ietf',                   # Default encoding
                    total_duration        = duration,
                    encoding              = 'json',                        # Use 'json' encoding (not 'json_ietf')
                )

                self.logger.info("Subscribing to %s with job_id %s ...", sub_endpoint, sub_id)
                response.append(True)
            except:
+3 −4
Original line number Diff line number Diff line
@@ -69,9 +69,8 @@ class PathMapper:
        ],
        
        # ---- total power (optical wavelength router) ----------------
        # For optical devices using flex-scale-mg-on YANG model
        # Path format: optical-power-total-input/instant or optical-power-total-output/instant
        KPI.TOTAL_POWER: [
        # For optical devices using FlexScale MGON YANG model
        KPI.KPISAMPLETYPE_OPTICAL_POWER_TOTAL_INPUT : [
            "optical-power-total-input/instant",        
        ],
    }
+57 −24
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@
# limitations under the License.


import time
from pygnmi.client import gNMIclient  # type: ignore
from queue import Queue
from typing import Callable, Tuple, Optional, List, Any
@@ -21,8 +22,8 @@ import json
import logging
import threading

logger = logging.getLogger(__name__)
# logger.setLevel(logging.INFO)
LOGGER = logging.getLogger(__name__)
# LOGGER.setLevel(logging.INFO)


class Subscription:
@@ -41,7 +42,8 @@ class Subscription:
        metric_queue:          Queue,
        mode:                  str             = "stream",
        sample_interval_ns:    int             = 10_000_000_000,
        heartbeat_interval_ns: Optional[int] = None,
        heartbeat_interval_ns: Optional[int]   = 10_000_000_000,
        total_duration:        Optional[float] = 60.0,                  # in seconds
        encoding:              str             = "json_ietf",
        on_update:             Optional[Callable[[dict], None]] = None,
    ) -> None:
@@ -54,15 +56,30 @@ class Subscription:
        self._thread       = threading.Thread(
            target = self._run,
            args   = (
                path_list, mode,
                sample_interval_ns, heartbeat_interval_ns, encoding, on_update,
                path_list, mode, sample_interval_ns, 
                heartbeat_interval_ns, encoding, on_update,
            ),
            name=f"gnmi-sub-{sub_id[:8]}",
            daemon=True,
        )
        # Start the subscription thread
        self._thread.start()
        logger.info("Started subscription %s",sub_id)
        
        # Stop the subscription after the given duration
        if total_duration and total_duration > 0:
            def stop_after_duration():
                time.sleep(total_duration)
                LOGGER.warning(f"Execution duration ({total_duration}s) completed for Subscription: {sub_id}")
                self.stop()

            duration_thread = threading.Thread(
                target=stop_after_duration, daemon=True, name=f"stop_after_duration_{sub_id[:8]}"
            )
            duration_thread.start()
        else:
            LOGGER.debug("Subscription %s has no total duration limit", sub_id)

        LOGGER.info("Started subscription %s",sub_id)
    # --------------------------------------------------------------#
    #  Public helpers                                               #
    # --------------------------------------------------------------#
@@ -72,17 +89,17 @@ class Subscription:
    def stop(self) -> None:
        """Gracefully stop the subscription thread."""
        if not self._thread.is_alive():
            logger.debug("Subscription %s thread already stopped", self.sub_id)
            LOGGER.debug("Subscription %s thread already stopped", self.sub_id)
            return
        
        logger.debug("Stopping subscription %s...", self.sub_id)
        LOGGER.debug("Stopping subscription %s...", self.sub_id)
        self._stop_event.set()
        self._thread.join(timeout=3)
        
        if self._thread.is_alive():
            logger.warning("Subscription %s thread did not stop within timeout", self.sub_id)
            LOGGER.warning("Subscription %s thread did not stop within timeout", self.sub_id)
        else:
            logger.info("Stopped subscription %s", self.sub_id)
            LOGGER.info("Stopped subscription %s", self.sub_id)

    # --------------------------------------------------------------#
    #  Internal loop                                                #
@@ -186,6 +203,7 @@ class Subscription:
                break

            entry: dict = {"path": path}
            LOGGER.debug("Subscription %s preparing entry for path: %s", self.sub_id, path)

            if entry_mode == "sample":
                entry["mode"]            = "sample"
@@ -202,50 +220,65 @@ class Subscription:
                "mode": top_mode,
                "encoding": encoding,
            }
            logger.debug("Subscription %s to be requested: %s", self.sub_id, request)
            LOGGER.debug("Subscription %s to be requested: %s", self.sub_id, request)
            try:
                logger.debug("Sub %s attempting path %s", self.sub_id, path)
                LOGGER.debug("Sub %s attempting path %s", self.sub_id, path)
                for stream in self.gnmi_client.subscribe(request):
                    # Check if stop was requested
                    if self._stop_event.is_set():
                        logger.debug("Sub %s stop requested, breaking stream loop", self.sub_id)
                        LOGGER.debug("Sub %s stop requested, breaking stream loop", self.sub_id)
                        break
                    
                    LOGGER.debug("Sub %s received stream message: %s", self.sub_id, stream)
                    
                    # DEBUG: Check if update has actual update messages
                    if stream.HasField("update"):
                        LOGGER.debug("Sub %s update field present, num updates: %d", 
                                   self.sub_id, len(stream.update.update))
                        if len(stream.update.update) == 0:
                            LOGGER.warning("Sub %s received update notification with NO data values - device may have no data for path %s",
                                         self.sub_id, path)
                        for i, upd in enumerate(stream.update.update):
                            LOGGER.debug("Sub %s update[%d] has val: %s, path elem count: %d", 
                                       self.sub_id, i, upd.HasField("val"), 
                                       len(upd.path.elem) if upd.path else 0)
                    
                    # Parse the protobuf message directly (like pygnmi does)
                    msg_dict = self._parse_subscribe_response(stream)
                    LOGGER.debug("Sub %s received message: %s", self.sub_id, msg_dict)
                    
                    # Process any update data
                    if msg_dict.get('update'):
                        logger.debug("Sub %s got update data", self.sub_id)
                        LOGGER.debug("Sub %s got update data", self.sub_id)
                        if on_update:
                            on_update(msg_dict)
                        else:
                            self._queue.put(msg_dict)
                    # Put a dummy update if syncResponse is received to prevent timeout
                    elif msg_dict.get('sync_response'):
                        logger.debug("Sub %s received sync response", self.sub_id)
                        LOGGER.debug("Sub %s received sync response", self.sub_id)
                        # Optional: put a notification about the sync
                        if not on_update:
                            self._queue.put({"type": "sync_response", "value": True})
                    else:
                        logger.warning("Sub %s received unknown message: %s", self.sub_id, msg_dict)
                        LOGGER.warning("Sub %s received unknown message: %s", self.sub_id, msg_dict)

            except grpc.RpcError as err:
                # Handle graceful shutdown (channel closed)
                if err.code() == grpc.StatusCode.CANCELLED:
                    logger.debug("Sub %s cancelled (channel closed) - graceful shutdown", self.sub_id)
                    LOGGER.debug("Sub %s cancelled (channel closed) - graceful shutdown", self.sub_id)
                    break
                elif err.code() == grpc.StatusCode.INVALID_ARGUMENT:
                    logger.warning("Path '%s' rejected (%s) -- trying next",
                    LOGGER.warning("Path '%s' rejected (%s) -- trying next",
                                  path, err.details())
                    continue
                else:
                    logger.exception("Subscription %s hit gRPC error: %s",
                    LOGGER.exception("Subscription %s hit gRPC error: %s",      # Change with TFS Exception
                                    self.sub_id, err)
                    break

            except Exception as exc:  # pylint: disable=broad-except
                logger.exception("Subscription %s failed: %s", self.sub_id, exc)
                LOGGER.exception("Subscription %s failed: %s", self.sub_id, exc)        # Change with TFS Exception
                break

        logger.info("Subscription thread %s terminating", self.sub_id)
        LOGGER.info("Subscription thread %s terminating", self.sub_id)
+2 −2
Original line number Diff line number Diff line
@@ -56,7 +56,7 @@ devices = {
        'username': 'admin',
        'password': 'admin',
        'insecure': True,
        'kpi'     : KPI.TOTAL_POWER,
        'kpi'     : KPI.KPISAMPLETYPE_OPTICAL_POWER_TOTAL_INPUT,
        'resource': 'wavelength-router',    #TODO: verify resource name form mg-on model
        'endpoint': '1',
    },
@@ -64,7 +64,7 @@ devices = {

def creat_basic_sub_request_parameters() -> dict:

    device   = devices['device3']
    device = devices['mgon']
    if device:
        kpi      = device['kpi']
        resource = device['resource']