Commit 0ed903a6 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Merge branch 'feat/304-cttc-netconf-based-openconfig-telemetry-collector' of...

Merge branch 'feat/304-cttc-netconf-based-openconfig-telemetry-collector' of ssh://labs.etsi.org:29419/tfs/controller into feat/396-sssa-integration-of-optical-controller-with-closed-loop-services
parents d830d55f 73c8f1d8
Loading
Loading
Loading
Loading
+42 −0
Original line number Diff line number Diff line
#!/bin/bash
# Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc

# Collector folders to exclude from measurement and reporting
OMIT_COLLECTORS="telemetry/backend/service/collectors/emulated/*"
OMIT_COLLECTORS="${OMIT_COLLECTORS},telemetry/backend/service/collectors/gnmi_oc/*"
OMIT_COLLECTORS="${OMIT_COLLECTORS},telemetry/backend/service/collectors/int_collector/*"

# Files to include in the HTML report:
#   - netconf_oc collector (the code under test)
#   - telemetry backend service layer (*.py directly in service/)
INCLUDE_REPORT="telemetry/backend/service/collectors/netconf_oc/*"
INCLUDE_REPORT="${INCLUDE_REPORT},telemetry/backend/service/*.py"

# This is unit test (should be tested with container-lab running)
python3 -m coverage run \
    --source=telemetry/backend/service \
    --omit="${OMIT_COLLECTORS}" \
    -m pytest --log-level=info --log-cli-level=info --verbose \
    telemetry/backend/tests/netconf_oc/test_unit_NetconfOCcollector.py

python3 -m coverage html \
    --directory=../coverage/ecoc26_netconf_oc_unit \
    --include="${INCLUDE_REPORT}"

echo "Bye!"
+1 −0
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@ APScheduler>=3.10.4
confluent-kafka==2.3.*
deepdiff==6.7.*
kafka-python==2.0.6
ncclient>=0.6.15
numpy==2.0.1
pygnmi==0.8.14
pytz>=2025.2
+35 −4
Original line number Diff line number Diff line
@@ -15,7 +15,7 @@
import json
import uuid
import logging
from typing import Optional
from typing import List, Tuple, Optional
from .collector_api._Collector               import _Collector
from .collector_api.DriverInstanceCache      import get_driver
from .collectors.int_collector.INTCollector  import INTCollector
@@ -24,8 +24,8 @@ from common.tools.context_queries.Device import get_device
from common.tools.context_queries.EndPoint   import get_endpoint_names
from common.tools.context_queries.Service    import get_service_by_uuid

from typing import List, Tuple, Optional
from .collectors.gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector
from .collectors.netconf_oc.KPI import PLUGGABLE_KPI_TYPES

LOGGER = logging.getLogger(__name__)

@@ -120,6 +120,35 @@ def get_ip_subscriptions(
    return subscriptions


def get_pluggable_optical_subscription(
        kpi_id: str, kpi_descriptor, duration: float, interval: float,
) -> Optional[List[Tuple]]:
    """
    Build subscription parameters for pluggable optical-channel KPIs.
    The channel name is taken directly from the endpoint_id UUID stored
    in the KPI descriptor (e.g. 'channel-1').
    Returns:
        List with one subscription tuple, or None if endpoint is missing.
    """
    channel_name = kpi_descriptor.endpoint_id.endpoint_uuid.uuid
    if not channel_name:
        LOGGER.warning(f"KPI ID: {kpi_id} - endpoint_id UUID is empty. Skipping...")
        return None
    LOGGER.info(f"KPI ID: {kpi_id} - pluggable channel: {channel_name}")
    return [
        (
            str(uuid.uuid4()),
            {
                "kpi"      : kpi_descriptor.kpi_sample_type,
                "endpoint" : channel_name,
                "resource" : "optical-channel",
            },
            float(duration),
            float(interval),
        )
    ]


def get_subscription_parameters(
        kpi_id : str, kpi_manager_client, context_client, duration, interval, resource: str = "",
        ) -> Optional[List[Tuple]]:
@@ -154,8 +183,10 @@ def get_subscription_parameters(
    if not device:
        raise Exception(f"KPI ID: {kpi_id} - Device not found for KPI descriptor.")
    
    # Route to appropriate subscription handler based on device type
    if device.device_type in ['optical-roadm', 'optical-transponder']:
    # Route to appropriate subscription handler
    if kpi_descriptor.kpi_sample_type in PLUGGABLE_KPI_TYPES:
        return get_pluggable_optical_subscription(kpi_id, kpi_descriptor, duration, interval)
    elif device.device_type in ['optical-roadm', 'optical-transponder']:
        return get_optical_subscription(kpi_id, kpi_descriptor, context_client, duration, interval, resource="wavelength-router")
    else:
        return get_ip_subscriptions(kpi_id, kpi_descriptor, device, context_client, duration, interval, resource="interface")
+4 −6
Original line number Diff line number Diff line
@@ -64,7 +64,6 @@ class TelemetryBackendService(GenericGrpcService):
        listener for requests on Kafka topic.
        """
        LOGGER.info('Telemetry backend request listener is running ...')
        # print      ('Telemetry backend request listener is running ...')
        consumer = self.kafka_consumer
        consumer.subscribe([KafkaTopic.TELEMETRY_REQUEST.value])
        while True:
@@ -183,13 +182,12 @@ class TelemetryBackendService(GenericGrpcService):
            else:
                LOGGER.info(f"Subscription successful for KPI ID: {kpi_id} - Status: {status}")
                
        sample_value = None 
        for samples in self.device_collector.GetState(duration=duration, blocking=True):
            LOGGER.info(f"KPI ID: {kpi_id} - Samples: {samples}")
            if isinstance(samples, dict):
                self.GenerateKpiValue(collector_id, kpi_id, sample_value)
        for (timestamp, resource_key, value) in self.device_collector.GetState(duration=duration, blocking=True):
            LOGGER.info(f"KPI ID: {kpi_id} - resource={resource_key} value={value}")
            self.GenerateKpiValue(collector_id, kpi_id, value)
            if stop_event.is_set():
                self.device_collector.Disconnect()
                break

    def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
        """
+7 −8
Original line number Diff line number Diff line
@@ -58,14 +58,13 @@ if LOAD_ALL_DEVICE_DRIVERS:
    )

if LOAD_ALL_DEVICE_DRIVERS:
    from .gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector # pylint: disable=wrong-import-position
    from .netconf_oc.NetconfOCCollector import NetconfOpenConfigCollector # pylint: disable=wrong-import-position
    COLLECTORS.append(
        (GNMIOpenConfigCollector, [
        (NetconfOpenConfigCollector, [
            {
                FilterFieldEnum.DEVICE_TYPE: [
                    DeviceTypeEnum.OPTICAL_ROADM,
                    DeviceTypeEnum.OPTICAL_TRANSPONDER
                ],
                FilterFieldEnum.DRIVER     : DeviceDriverEnum.DEVICEDRIVER_OC,
                # Pluggable host devices (packet-router with NETCONF/OpenConfig driver)
                FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.PACKET_ROUTER,
                FilterFieldEnum.DRIVER     : DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG,
            }
        ]))
        ])
    )
Loading