Commit 104339b6 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Enhance optical controller integration with closed-loop services

- adding pluggable KPI types
- updating collector metadata handling
- refining connection logic for device drivers.
parent 99f9e846
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -77,6 +77,6 @@ enum KpiSampleType {
    KPISAMPLETYPE_INT_DROP_REASON    = 2202;

    // PLUGGABLES
    KPISAMPLETYPE_PRE_FEC_BER              = 2301;
    KPISAMPLETYPE_PRE_FEC_BER_PLUGGABLE    = 2301;
    KPISAMPLETYPE_RECEIVED_POWER_PLUGGABLE = 2302;
}
+6 −0
Original line number Diff line number Diff line
@@ -36,6 +36,7 @@ message CollectorId {
   context.Timestamp  start_time    = 5; // Timestamp when Collector start execution
   context.Timestamp  end_time      = 6; // Timestamp when Collector stop execution
   INTCollector       int_collector = 7; // Extra optional information about INT collectors
   CollectorMetaInfo  coll_meta_info= 8; // Extra optional collector information
 }

message INTCollector {
@@ -45,6 +46,11 @@ message INTCollector {
    string context_id     = 4; // Context identifier related to this collector
}

message CollectorMetaInfo {
  context.DeviceDriverEnum  device_driver = 1;
  string                    device_type   = 2;
}

message CollectorFilter {
  // Collector that fulfill the filter are those that match ALL the following fields.
  // An empty list means: any value is accepted.
+64 −1
Original line number Diff line number Diff line
@@ -17,8 +17,9 @@ import uuid
import logging
from typing import List, Tuple, Optional
from .collector_api._Collector               import _Collector
from .collector_api.DriverInstanceCache      import get_driver
from .collector_api.DriverInstanceCache      import DriverInstanceCache, get_driver
from .collectors.int_collector.INTCollector  import INTCollector
from telemetry.backend.Tools                 import get_connect_rules
from common.proto.kpi_manager_pb2            import KpiId
from common.tools.context_queries.Device     import get_device
from common.tools.context_queries.EndPoint   import get_endpoint_names
@@ -224,6 +225,68 @@ def get_collector_by_kpi_id(kpi_id: str, kpi_manager_client, context_client, dri
    # LOGGER.info(f"Collector for KPI ID: {kpi_id} - {collector.__class__.__name__}")
    return collector

def get_collector_by_meta_info(
        kpi_id: str, device_type: str, device_driver: int,
        kpi_manager_client, context_client, driver_instance_cache: DriverInstanceCache
) -> Optional[_Collector]:
    """
    Get a collector instance using device_type and device_driver from coll_meta_info.
    Uses Context only to retrieve the device connection parameters (address/port/settings);
    the filter fields for driver class selection come from coll_meta_info, not from the
    device's registered driver list (which may be absent for pluggable devices).
    """
    LOGGER.info(f"Getting collector by meta_info for KPI ID: {kpi_id} "
                f"device_type={device_type} device_driver={device_driver}")
    kpi_id_obj             = KpiId()
    kpi_id_obj.kpi_id.uuid = kpi_id  # pyright: ignore[reportAttributeAccessIssue]
    kpi_descriptor         = kpi_manager_client.GetKpiDescriptor(kpi_id_obj)
    if not kpi_descriptor:
        raise Exception(f"KPI ID: {kpi_id} - Descriptor not found.")

    device_uuid = kpi_descriptor.device_id.device_uuid.uuid
    device = get_device(
        context_client       = context_client,
        device_uuid          = device_uuid,
        include_config_rules = True,
        include_components   = False,
    )
    if not device:
        raise Exception(f"KPI ID: {kpi_id} - Device {device_uuid} not found in Context.")

    connect_rules = get_connect_rules(device.device_config)
    address  = connect_rules.get('address',  '127.0.0.1')
    port     = connect_rules.get('port',     '0')
    settings = connect_rules.get('settings', '{}')
    try:
        settings = json.loads(settings)
    except (ValueError, TypeError):
        settings = {}

    collector : _Collector = driver_instance_cache.get_by_meta_info(
        device_uuid, device_type, device_driver, address, port, settings
    )
    if collector is None:
        raise Exception(
            f"KPI ID: {kpi_id} - No collector found for "
            f"device_type={device_type} device_driver={device_driver}."
        )

    connected = False
    try:
        connected = collector.Connect()
    except Exception as e:
        driver_instance_cache.delete(device_uuid)
        raise Exception(
            f"Collector({collector.__class__.__name__}) connection to device({device_uuid}) failed: {e}"
        ) from e
    if not connected:
        driver_instance_cache.delete(device_uuid)
        raise Exception(
            f"Collector({collector.__class__.__name__}) failed to connect to device({device_uuid})"
        )

    return collector

def get_node_level_int_collector(collector_id: str, kpi_id: str, address: str, interface: str, port: int,
            service_id: str, context_id: str) -> Optional[_Collector]:
    """
+14 −3
Original line number Diff line number Diff line
@@ -17,7 +17,9 @@ import time
import logging
import threading

from .HelperMethods   import get_collector_by_kpi_id, get_subscription_parameters, get_node_level_int_collector, get_mgon_subscription_parameters, get_mgon_collector
from .HelperMethods   import (get_collector_by_kpi_id, get_collector_by_meta_info,
                               get_subscription_parameters, get_node_level_int_collector,
                               get_mgon_subscription_parameters, get_mgon_collector)
from common.Constants import ServiceNameEnum
from common.Settings  import get_service_port_grpc
from confluent_kafka  import Consumer as KafkaConsumer
@@ -158,9 +160,18 @@ class TelemetryBackendService(GenericGrpcService):
        #         insecure    = collector.get('insecure', True),
        #         skip_verify = collector.get('skip_verify', True)
        #     )
        else:
            device_driver = collector.get('device_driver', None)  # int: DeviceDriverEnum value
            device_type   = collector.get('device_type',   None)  # str: e.g. 'packet-router'
            if device_driver and device_type:
                self.device_collector = get_collector_by_meta_info(
                    kpi_id, device_type, device_driver,
                    self.kpi_manager_client, self.context_client, self.driver_instance_cache
                )
            else:
                self.device_collector = get_collector_by_kpi_id(
                kpi_id, self.kpi_manager_client, self.context_client, self.driver_instance_cache)
                    kpi_id, self.kpi_manager_client, self.context_client, self.driver_instance_cache
                )

        if not self.device_collector:
            LOGGER.warning(f"KPI ID: {kpi_id} - Collector not found. Skipping...")
+22 −2
Original line number Diff line number Diff line
@@ -14,7 +14,7 @@

import json, logging, threading
from typing import Any, Dict, Optional
from common.method_wrappers.ServiceExceptions import InvalidArgumentException
from common.method_wrappers.ServiceExceptions import InvalidArgumentException, OperationFailedException
from common.proto.context_pb2 import Device, Empty
from context.client.ContextClient import ContextClient
from telemetry.backend.Tools import get_connect_rules
@@ -63,6 +63,18 @@ class DriverInstanceCache:
            # LOGGER.info('get method finished.')
            return driver_instance

    def get_by_meta_info(
        self, device_uuid: str, device_type: str, device_driver: int,
        address: str, port: int, settings: Dict[str, Any]
    ) -> _Collector:
        """Get a collector instance using explicit device_type and device_driver from coll_meta_info,
        bypassing the Device proto's driver list (which may not be set for pluggable devices)."""
        filter_fields = {
            FilterFieldEnum.DEVICE_TYPE: device_type,
            FilterFieldEnum.DRIVER:      [device_driver],
        }
        return self.get(device_uuid, filter_fields=filter_fields, address=address, port=port, settings=settings)

    def delete(self, device_uuid : str) -> None:
        with self._lock:
            device_driver = self._device_uuid__to__driver_instance.pop(device_uuid, None)
@@ -107,7 +119,15 @@ def get_driver(driver_instance_cache : DriverInstanceCache, device : Device) ->

    driver : _Collector = driver_instance_cache.get(
        device_uuid, filter_fields=driver_filter_fields, address=address, port=port, settings=settings)

    try:
        driver.Connect()
    except Exception as e:
        driver_instance_cache.delete(device_uuid)
        raise OperationFailedException(
            'Driver({:s}) connection to device({:s})'.format(driver.__class__.__name__, device_uuid),
            extra_details=str(e)
        ) from e

    return driver

Loading