Commit c2b96cf3 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

feat: enhance telemetry component with multiple service IP exports and subscription handling

parent bf9f7c11
Loading
Loading
Loading
Loading
+35 −2
Original line number Diff line number Diff line
@@ -19,11 +19,44 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src
# RCFILE=$PROJECTDIR/coverage/.coveragerc

export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require"
RCFILE=$PROJECTDIR/coverage/.coveragerc
# RCFILE=$PROJECTDIR/coverage/.coveragerc

export KFK_SERVER_ADDRESS='127.0.0.1:9094'

IP_KPI=$(kubectl get all --all-namespaces | grep service/kpi-managerservice | awk '{print $4}')
export IP_KPI
echo "KPI Manager Service IP: ${IP_KPI}"

IP_TELE=$(kubectl get all --all-namespaces | grep service/telemetryservice | awk '{print $4}')
export IP_TELE
echo "Telemetry Frontend Service IP: ${IP_TELE}"

IP_CONTEXT=$(kubectl get all --all-namespaces | grep service/contextservice | awk '{print $4}')
export IP_CONTEXT
echo "Context Service IP: ${IP_CONTEXT}"

# Start Kafka port-forward in background
kubectl port-forward -n kafka service/kafka-public 9094:9094 > /dev/null 2>&1 &
KAFKA_PF_PID=$!

# Function to cleanup port-forward on exit
cleanup() {
    # echo "Cleaning up Kafka port-forward (PID: ${KAFKA_PF_PID})..."
    kill ${KAFKA_PF_PID} 2>/dev/null || true
    wait ${KAFKA_PF_PID} 2>/dev/null || true
    sleep 1
}

trap cleanup EXIT INT TERM
echo "Waiting for Kafka port-forward to be ready..."
sleep 1

# Verify port-forward is working
if ! nc -z 127.0.0.1 9094 2>/dev/null; then
    echo "WARNING: Kafka port-forward may not be ready yet"
fi

python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \
    telemetry/backend/tests/test_backend.py
+15 −2
Original line number Diff line number Diff line
@@ -19,9 +19,22 @@ RCFILE=$PROJECTDIR/coverage/.coveragerc

export KFK_SERVER_ADDRESS='127.0.0.1:9094'


IP_KPI=$(kubectl get all --all-namespaces | grep service/kpi-managerservice | awk '{print $4}')
export IP_KPI
echo "KPI Manager Service IP: ${IP_KPI}"

IP_TELE=$(kubectl get all --all-namespaces | grep service/telemetryservice | awk '{print $4}')
export IP_TELE
echo "Telemetry Frontend Service IP: ${IP_TELE}"

IP_CONTEXT=$(kubectl get all --all-namespaces | grep service/contextservice | awk '{print $4}')
export IP_CONTEXT
echo "Context Service IP: ${IP_CONTEXT}"

# This is unit test (should be tested with container-lab running)
python3 -m pytest --log-level=info --log-cli-level=info --verbose \
    telemetry/backend/tests/gnmi_oc/test_unit_GnmiOpenConfigCollector.py 
# python3 -m pytest --log-level=info --log-cli-level=info --verbose \
#     telemetry/backend/tests/gnmi_oc/test_unit_GnmiOpenConfigCollector.py::test_full_workflow

# This is integration test (should be tested with container-lab running)
python3 -m pytest --log-level=info --log-cli-level=info --verbose \
+104 −39
Original line number Diff line number Diff line
@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import uuid
import logging
from .collector_api._Collector               import _Collector
@@ -20,48 +21,72 @@ from .collectors.int_collector.INTCollector import INTCollector
from common.proto.kpi_manager_pb2            import KpiId
from common.tools.context_queries.Device     import get_device
from common.tools.context_queries.EndPoint   import get_endpoint_names
from common.tools.context_queries.Service    import get_service_by_uuid

from typing import List, Tuple, Optional
from .collectors.gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector

LOGGER = logging.getLogger(__name__)

def get_subscription_parameters(
        kpi_id : str, kpi_manager_client, context_client, duration, interval
def get_optical_subscription(
        kpi_id: str, kpi_descriptor, context_client, duration: float, interval: float, resource: str,
) -> Optional[List[Tuple]]:
    """
    Method to get subscription parameters based on KPI ID.
    Returns a list of tuples with subscription parameters.
    Each tuple contains:
        - Subscription ID (str)
        - Dictionary with:
            - "kpi" (str): KPI ID
            - "endpoint" (str): Endpoint name (e.g., 'eth0')
            - "resource" (str): Resource type (e.g., 'interface')
        - Sample interval (float)
        - Report interval (float)
    If the KPI ID is not found or the device is not available, returns None.
    Preconditions:
        - A KPI Descriptor must be added in KPI DB with correct device_id.
        - The device must be available in the context.
    Get subscription parameters for optical devices (optical-roadm, optical-transponder) 
    using service uuid from KPI descriptor to find the endpoint in service config rules.    
    Returns:
        List of subscription tuples or None if unable to create subscription
    """
    kpi_id_obj = KpiId()
    kpi_id_obj.kpi_id.uuid = kpi_id              # pyright: ignore[reportAttributeAccessIssue]
    kpi_descriptor = kpi_manager_client.GetKpiDescriptor(kpi_id_obj)
    if not kpi_descriptor:
        LOGGER.warning(f"KPI ID: {kpi_id} - Descriptor not found. Skipping...")
    _service_id = kpi_descriptor.service_id.service_uuid.uuid
    LOGGER.info(f"KPI Descriptor (Service ID): {_service_id}")
    
    service = get_service_by_uuid(context_client, _service_id)
    if not service:
        LOGGER.warning(f"KPI ID: {kpi_id} - Service not found for Service ID: {_service_id}. Skipping...")
        return None
    
    kpi_sample_type = kpi_descriptor.kpi_sample_type
    LOGGER.info(f"KPI Descriptor (KPI Sample Type): {kpi_sample_type}")
    LOGGER.debug(f"Service for KPI ID: {kpi_id} - {service.name} - {service.service_config}")
    if not service.service_config.config_rules:
        LOGGER.warning(f"KPI ID: {kpi_id} - No config rules in service config. Skipping...")
        return None

    device = get_device( context_client       = context_client,
                         device_uuid          = kpi_descriptor.device_id.device_uuid.uuid,
                         include_config_rules = False,
                         include_components   = False
    # Get the first config rule's custom resource_value
    config_rule = service.service_config.config_rules[0]
    if not config_rule.HasField('custom'):
        LOGGER.warning(f"KPI ID: {kpi_id} - No custom config in service config. Skipping...")
        return None

    resource_value = json.loads(config_rule.custom.resource_value)
    
    if 'ob_id' not in resource_value:
        LOGGER.warning(f"KPI ID: {kpi_id} - Resource ob_id not found in service config. Skipping...")
        return None

    endpoint = resource_value['ob_id']
    return [
        (
            str(uuid.uuid4()),
            {
                "kpi"      : kpi_descriptor.kpi_sample_type,
                "endpoint" : endpoint,
                "resource" : resource,
            },
            float(duration),
            float(interval),
        )
    if not device:
        raise Exception(f"KPI ID: {kpi_id} - Device not found for KPI descriptor.")
    endpoints = device.device_endpoints
    ]


def get_ip_subscriptions(
        kpi_id: str, kpi_descriptor, device, context_client, duration: float, interval: float, resource: str,
) -> Optional[List[Tuple]]:
    """
    Get subscription parameters for IP/packet devices (routers, switches, etc.) 
    using device endpoints from context to create subscriptions for each endpoint.
    Returns:
        List of subscription tuples (one per endpoint) or None if unable to create subscriptions
    """
    endpoints = device.device_endpoints
    LOGGER.debug(f"Device for KPI ID: {kpi_id} - {endpoints}")
    endpointsIds = [endpoint_id.endpoint_id for endpoint_id in endpoints]
    for endpoint_id in endpoints:
@@ -76,17 +101,16 @@ def get_subscription_parameters(
    LOGGER.debug(f"Endpoint data: {endpoint_data}")

    subscriptions = []
    sub_id = None
    for endpoint in endpointsIds:
        sub_id = str(uuid.uuid4())  # Generate a unique subscription ID
        sub_id = str(uuid.uuid4())
        LOGGER.info(f"Endpoint names only: {endpoint_data[endpoint.endpoint_uuid.uuid][0]}") 
        subscriptions.append(
            (
                sub_id,  # Example subscription ID
                sub_id,
                {
                    "kpi"      : kpi_sample_type,   # As request is based on the single KPI so it should have only one endpoint
                    "endpoint" : endpoint_data[endpoint.endpoint_uuid.uuid][0],  # Endpoint name
                    "resource" : 'interface',  # Example resource type
                    "kpi"      : kpi_descriptor.kpi_sample_type,
                    "endpoint" : endpoint_data[endpoint.endpoint_uuid.uuid][0],
                    "resource" : resource,
                },
                float(duration),
                float(interval),
@@ -94,6 +118,47 @@ def get_subscription_parameters(
        )
    return subscriptions


def get_subscription_parameters(
        kpi_id : str, kpi_manager_client, context_client, duration, interval, resource: str = "",
        ) -> Optional[List[Tuple]]:
    """
    Method to get subscription parameters based on KPI ID.
    Returns a list of tuples with subscription parameters.
    Each tuple contains:
        - Subscription ID (str)
        - Dictionary with:
            - "kpi" (str): KPI ID
            - "endpoint" (str): Endpoint name (e.g., 'eth0')
            - "resource" (str): Resource type (e.g., 'interface')
        - Sample interval (float)
        - Report interval (float)
    If the KPI ID is not found or the device is not available, returns None.
    Preconditions:
        - A KPI Descriptor must be added in KPI DB with correct device_id.
        - The device must be available in the context.
    """
    kpi_id_obj             = KpiId()
    kpi_id_obj.kpi_id.uuid = kpi_id
    kpi_descriptor         = kpi_manager_client.GetKpiDescriptor(kpi_id_obj)
    if not kpi_descriptor:
        LOGGER.warning(f"KPI ID: {kpi_id} - Descriptor not found. Skipping...")
        return None
    
    device = get_device(context_client       = context_client,
                        device_uuid          = kpi_descriptor.device_id.device_uuid.uuid,
                        include_config_rules = False,
                        include_components   = False
                        )
    if not device:
        raise Exception(f"KPI ID: {kpi_id} - Device not found for KPI descriptor.")
    
    # Route to appropriate subscription handler based on device type
    if device.device_type in ['optical-roadm', 'optical-transponder']:
        return get_optical_subscription(kpi_id, kpi_descriptor, context_client, duration, interval, resource="wavelength-router")
    else:
        return get_ip_subscriptions(kpi_id, kpi_descriptor, device, context_client, duration, interval, resource="interface")

def get_collector_by_kpi_id(kpi_id: str, kpi_manager_client, context_client, driver_instance_cache
                            ) -> Optional[_Collector]:
    """
+40 −7
Original line number Diff line number Diff line
@@ -52,8 +52,8 @@ class TelemetryBackendService(GenericGrpcService):
                                            'auto.offset.reset'  : 'latest'})
        self.driver_instance_cache = driver_instance_cache
        self.device_collector      = None
        self.context_client        = ContextClient()
        self.kpi_manager_client    = KpiManagerClient()
        self.context_client        = ContextClient(host="10.152.183.28", port=1010)
        self.kpi_manager_client    = KpiManagerClient(host="10.152.183.25", port=30010)
        self.active_jobs = {}

    def install_servicers(self):
@@ -144,6 +144,15 @@ class TelemetryBackendService(GenericGrpcService):
            )
            return
        # Rest of the collectors
        # elif context_id == "43813baf-195e-5da6-af20-b3d0922e71a7":
        #     self.device_collector = get_mgon_collector(
        #         address     = collector['host'],            # "172.17.254.24",
        #         port        = collector['port'],            # 50061,
        #         username    = collector['username'],        # "admin",
        #         password    = collector['password'],        # "admin",
        #         insecure    = collector.get('insecure', True),
        #         skip_verify = collector.get('skip_verify', True)
        #     )
        else:
            self.device_collector = get_collector_by_kpi_id(
                kpi_id, self.kpi_manager_client, self.context_client, self.driver_instance_cache)
@@ -156,6 +165,12 @@ class TelemetryBackendService(GenericGrpcService):
        resource_to_subscribe = get_subscription_parameters(
            kpi_id, self.kpi_manager_client, self.context_client, duration, interval
        )
        # TODO: Remove after confirming get_subscription_parameters generic is working correctly
        # resource_to_subscribe = get_mgon_subscription_parameters(
        #         collector['resource'], collector['endpoint'], collector['kpi'],
        #         collector['duration'], collector['sample_interval']
        #         )

        if not resource_to_subscribe:
            LOGGER.warning(f"KPI ID: {kpi_id} - Resource to subscribe not found. Skipping...")
            raise Exception(f"KPI ID: {kpi_id} - Resource to subscribe not found.")
@@ -170,8 +185,26 @@ class TelemetryBackendService(GenericGrpcService):

        for samples in self.device_collector.GetState(duration=duration, blocking=True):
            LOGGER.info(f"KPI ID: {kpi_id} - Samples: {samples}")
            self.GenerateKpiValue(collector_id, kpi_id, samples)

            if isinstance(samples, dict):
                self.GenerateKpiValue(collector_id, kpi_id, sample_value)
                # inn_dict = samples.get('update', {})
                # LOGGER.info(f"KPI ID: {kpi_id} - Inner Dictionary: {inn_dict}")
                # list_update = inn_dict.get('update', [])
                # LOGGER.info(f"KPI ID: {kpi_id} - List Update: {list_update}")
                # if len(list_update) > 0:
                #     sample_value = list_update[0].get('val')
                #     self.GenerateKpiValue(collector_id, kpi_id, sample_value)
            '''
{
    'update': {
        'timestamp': 1772103489806507669,
        'update': 
        [
            {'path': 'openconfig-wavelength-router:wavelength-router/flex-scale-mg-on:optical-bands/optical-band[index=2]/state/optical-power-total-input/instant', 'val': -2.51}
        ]
    }
 }
            '''
            # TODO: Stop_event should be managed in this method because there will be no more specific collector
            if stop_event.is_set():
                self.device_collector.Disconnect()
+13 −0
Original line number Diff line number Diff line
@@ -56,3 +56,16 @@ if LOAD_ALL_DEVICE_DRIVERS:
            }
        ])
    )

if LOAD_ALL_DEVICE_DRIVERS:
    from .gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector # pylint: disable=wrong-import-position
    COLLECTORS.append(
        (GNMIOpenConfigCollector, [
            {
                FilterFieldEnum.DEVICE_TYPE: [
                    DeviceTypeEnum.OPTICAL_ROADM,
                    DeviceTypeEnum.OPTICAL_TRANSPONDER
                ],
                FilterFieldEnum.DRIVER     : DeviceDriverEnum.DEVICEDRIVER_OC,
            }
        ]))
Loading