Commit 14e5486e authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch...

Merge branch 'feat/338-cttc-multiple-enhancements-in-telemetry-component-and-deployment-logic' into 'develop'

Resolve "(CTTC) Multiple enhancements in Telemetry component and deployment logic"

See merge request !436
parents b2b37d5a 2b0d9d77
Loading
Loading
Loading
Loading
+0 −1
Original line number Diff line number Diff line
@@ -31,7 +31,6 @@ enum KpiSampleType {
    KPISAMPLETYPE_ML_CONFIDENCE                 = 401;  //. can be used by both optical and L3 without any issue

    KPISAMPLETYPE_OPTICAL_SECURITY_STATUS       = 501;  //. can be used by both optical and L3 without any issue
    KPISAMPLETYPE_OPTICAL_POWER_TOTAL_INPUT     = 502;
    KPISAMPLETYPE_OPTICAL_TOTAL_INPUT_POWER     = 503;

    KPISAMPLETYPE_L3_UNIQUE_ATTACK_CONNS        = 601;
+35 −2
Original line number Diff line number Diff line
@@ -19,11 +19,44 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src
# RCFILE=$PROJECTDIR/coverage/.coveragerc

export KFK_SERVER_ADDRESS='127.0.0.1:9092'
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require"
RCFILE=$PROJECTDIR/coverage/.coveragerc
# RCFILE=$PROJECTDIR/coverage/.coveragerc

export KFK_SERVER_ADDRESS='127.0.0.1:9094'

IP_KPI=$(kubectl get all --all-namespaces | grep service/kpi-managerservice | awk '{print $4}')
export IP_KPI
echo "KPI Manager Service IP: ${IP_KPI}"

IP_TELE=$(kubectl get all --all-namespaces | grep service/telemetryservice | awk '{print $4}')
export IP_TELE
echo "Telemetry Frontend Service IP: ${IP_TELE}"

IP_CONTEXT=$(kubectl get all --all-namespaces | grep service/contextservice | awk '{print $4}')
export IP_CONTEXT
echo "Context Service IP: ${IP_CONTEXT}"

# Start Kafka port-forward in background
kubectl port-forward -n kafka service/kafka-public 9094:9094 > /dev/null 2>&1 &
KAFKA_PF_PID=$!

# Function to cleanup port-forward on exit
cleanup() {
    # echo "Cleaning up Kafka port-forward (PID: ${KAFKA_PF_PID})..."
    kill ${KAFKA_PF_PID} 2>/dev/null || true
    wait ${KAFKA_PF_PID} 2>/dev/null || true
    sleep 1
}

trap cleanup EXIT INT TERM
echo "Waiting for Kafka port-forward to be ready..."
sleep 1

# Verify port-forward is working
if ! nc -z 127.0.0.1 9094 2>/dev/null; then
    echo "WARNING: Kafka port-forward may not be ready yet"
fi

python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \
    telemetry/backend/tests/test_backend.py
+13 −0
Original line number Diff line number Diff line
@@ -19,6 +19,19 @@ RCFILE=$PROJECTDIR/coverage/.coveragerc

export KFK_SERVER_ADDRESS='127.0.0.1:9092'


IP_KPI=$(kubectl get all --all-namespaces | grep service/kpi-managerservice | awk '{print $4}')
export IP_KPI
echo "KPI Manager Service IP: ${IP_KPI}"

IP_TELE=$(kubectl get all --all-namespaces | grep service/telemetryservice | awk '{print $4}')
export IP_TELE
echo "Telemetry Frontend Service IP: ${IP_TELE}"

IP_CONTEXT=$(kubectl get all --all-namespaces | grep service/contextservice | awk '{print $4}')
export IP_CONTEXT
echo "Context Service IP: ${IP_CONTEXT}"

# This is unit test (should be tested with container-lab running)
python3 -m pytest --log-level=info --log-cli-level=info --verbose \
    telemetry/backend/tests/gnmi_oc/test_unit_GnmiOpenConfigCollector.py::test_full_workflow
+103 −41
Original line number Diff line number Diff line
@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import uuid
import logging
from typing import Optional
@@ -21,51 +22,72 @@ from .collectors.int_collector.INTCollector import INTCollector
from common.proto.kpi_manager_pb2            import KpiId
from common.tools.context_queries.Device     import get_device
from common.tools.context_queries.EndPoint   import get_endpoint_names
from typing import List, Tuple, Optional
from telemetry.backend.service.collectors.gnmi_oc.KPI import KPI
from common.tools.context_queries.Service    import get_service_by_uuid

from typing import List, Tuple, Optional
from .collectors.gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector

LOGGER = logging.getLogger(__name__)

def get_subscription_parameters(
        kpi_id : str, kpi_manager_client, context_client, resource, duration, interval
def get_optical_subscription(
        kpi_id: str, kpi_descriptor, context_client, duration: float, interval: float, resource: str,
) -> Optional[List[Tuple]]:
    """
    Method to get subscription parameters based on KPI ID.
    Returns a list of tuples with subscription parameters.
    Each tuple contains:
        - Subscription ID (str)
        - Dictionary with:
            - "kpi" (str): KPI ID
            - "endpoint" (str): Endpoint name (e.g., 'eth0')
            - "resource" (str): Resource type (e.g., 'interface')
        - Sample interval (float)
        - Report interval (float)
    If the KPI ID is not found or the device is not available, returns None.
    Preconditions:
        - A KPI Descriptor must be added in KPI DB with correct device_id.
        - The device must be available in the context.
    Get subscription parameters for optical devices (optical-roadm, optical-transponder) 
    using service uuid from KPI descriptor to find the endpoint in service config rules.    
    Returns:
        List of subscription tuples or None if unable to create subscription
    """
    kpi_id_obj             = KpiId()
    kpi_id_obj.kpi_id.uuid = kpi_id              # pyright: ignore[reportAttributeAccessIssue]
    kpi_descriptor         = kpi_manager_client.GetKpiDescriptor(kpi_id_obj)
    if not kpi_descriptor:
        LOGGER.warning(f"KPI ID: {kpi_id} - Descriptor not found. Skipping...")
    _service_id = kpi_descriptor.service_id.service_uuid.uuid
    LOGGER.info(f"KPI Descriptor (Service ID): {_service_id}")
    
    service = get_service_by_uuid(context_client, _service_id)
    if not service:
        LOGGER.warning(f"KPI ID: {kpi_id} - Service not found for Service ID: {_service_id}. Skipping...")
        return None
    
    kpi_sample_type = kpi_descriptor.kpi_sample_type
    LOGGER.info(f"KPI Descriptor (KPI Sample Type): {kpi_sample_type}")
    LOGGER.debug(f"Service for KPI ID: {kpi_id} - {service.name} - {service.service_config}")
    if not service.service_config.config_rules:
        LOGGER.warning(f"KPI ID: {kpi_id} - No config rules in service config. Skipping...")
        return None

    device = get_device( context_client       = context_client,
                         device_uuid          = kpi_descriptor.device_id.device_uuid.uuid,
                         include_config_rules = False,
                         include_components   = False
    # Get the first config rule's custom resource_value
    config_rule = service.service_config.config_rules[0]
    if not config_rule.HasField('custom'):
        LOGGER.warning(f"KPI ID: {kpi_id} - No custom config in service config. Skipping...")
        return None

    resource_value = json.loads(config_rule.custom.resource_value)
    
    if 'ob_id' not in resource_value:
        LOGGER.warning(f"KPI ID: {kpi_id} - Resource ob_id not found in service config. Skipping...")
        return None

    endpoint = resource_value['ob_id']
    return [
        (
            str(uuid.uuid4()),
            {
                "kpi"      : kpi_descriptor.kpi_sample_type,
                "endpoint" : endpoint,
                "resource" : resource,
            },
            float(duration),
            float(interval),
        )
    if not device:
        raise Exception(f"KPI ID: {kpi_id} - Device not found for KPI descriptor.")     #TODO: Change to TFS NotFoundException 
    endpoints = device.device_endpoints
    ]


def get_ip_subscriptions(
        kpi_id: str, kpi_descriptor, device, context_client, duration: float, interval: float, resource: str,
) -> Optional[List[Tuple]]:
    """
    Get subscription parameters for IP/packet devices (routers, switches, etc.) 
    using device endpoints from context to create subscriptions for each endpoint.
    Returns:
        List of subscription tuples (one per endpoint) or None if unable to create subscriptions
    """
    endpoints = device.device_endpoints
    LOGGER.debug(f"Device for KPI ID: {kpi_id} - {endpoints}")
    endpointsIds = [endpoint_id.endpoint_id for endpoint_id in endpoints]
    for endpoint_id in endpoints:
@@ -80,17 +102,16 @@ def get_subscription_parameters(
    LOGGER.debug(f"Endpoint data: {endpoint_data}")

    subscriptions = []
    sub_id = None
    for endpoint in endpointsIds:
        sub_id = str(uuid.uuid4())  # Generate a unique subscription ID
        sub_id = str(uuid.uuid4())
        LOGGER.info(f"Endpoint names only: {endpoint_data[endpoint.endpoint_uuid.uuid][0]}") 
        subscriptions.append(
            (
                sub_id,  # Example subscription ID
                sub_id,
                {
                    "kpi"      : kpi_sample_type,   # As request is based on the single KPI so it should have only one endpoint
                    "endpoint" : endpoint_data[endpoint.endpoint_uuid.uuid][0],  # Endpoint name
                    "resource" : resource,          # Example resource type is 'interface' or 'wavelength-router' for MG-ON, this should be defined in the KPI Descriptor or as part of the request
                    "kpi"      : kpi_descriptor.kpi_sample_type,
                    "endpoint" : endpoint_data[endpoint.endpoint_uuid.uuid][0],
                    "resource" : resource,
                },
                float(duration),
                float(interval),
@@ -98,6 +119,47 @@ def get_subscription_parameters(
        )
    return subscriptions


def get_subscription_parameters(
        kpi_id : str, kpi_manager_client, context_client, duration, interval, resource: str = "",
        ) -> Optional[List[Tuple]]:
    """
    Method to get subscription parameters based on KPI ID.
    Returns a list of tuples with subscription parameters.
    Each tuple contains:
        - Subscription ID (str)
        - Dictionary with:
            - "kpi" (str): KPI ID
            - "endpoint" (str): Endpoint name (e.g., 'eth0')
            - "resource" (str): Resource type (e.g., 'interface')
        - Sample interval (float)
        - Report interval (float)
    If the KPI ID is not found or the device is not available, returns None.
    Preconditions:
        - A KPI Descriptor must be added in KPI DB with correct device_id.
        - The device must be available in the context.
    """
    kpi_id_obj             = KpiId()
    kpi_id_obj.kpi_id.uuid = kpi_id              # pyright: ignore[reportAttributeAccessIssue]
    kpi_descriptor         = kpi_manager_client.GetKpiDescriptor(kpi_id_obj)
    if not kpi_descriptor:
        LOGGER.warning(f"KPI ID: {kpi_id} - Descriptor not found. Skipping...")
        return None
    
    device = get_device(context_client       = context_client,
                        device_uuid          = kpi_descriptor.device_id.device_uuid.uuid,
                        include_config_rules = False,
                        include_components   = False
                        )
    if not device:
        raise Exception(f"KPI ID: {kpi_id} - Device not found for KPI descriptor.")
    
    # Route to appropriate subscription handler based on device type
    if device.device_type in ['optical-roadm', 'optical-transponder']:
        return get_optical_subscription(kpi_id, kpi_descriptor, context_client, duration, interval, resource="wavelength-router")
    else:
        return get_ip_subscriptions(kpi_id, kpi_descriptor, device, context_client, duration, interval, resource="interface")

def get_collector_by_kpi_id(kpi_id: str, kpi_manager_client, context_client, driver_instance_cache
                            ) -> Optional[_Collector]:
    """
+13 −37
Original line number Diff line number Diff line
@@ -150,15 +150,15 @@ class TelemetryBackendService(GenericGrpcService):
            )
            return
        # Rest of the collectors
        elif context_id == "43813baf-195e-5da6-af20-b3d0922e71a7":
            self.device_collector = get_mgon_collector(
                address     = collector['host'],            # "172.17.254.24",
                port        = collector['port'],            # 50061,
                username    = collector['username'],        # "admin",
                password    = collector['password'],        # "admin",
                insecure    = collector.get('insecure', True),
                skip_verify = collector.get('skip_verify', True)
            )
        # elif context_id == "43813baf-195e-5da6-af20-b3d0922e71a7":
        #     self.device_collector = get_mgon_collector(
        #         address     = collector['host'],            # "172.17.254.24",
        #         port        = collector['port'],            # 50061,
        #         username    = collector['username'],        # "admin",
        #         password    = collector['password'],        # "admin",
        #         insecure    = collector.get('insecure', True),
        #         skip_verify = collector.get('skip_verify', True)
        #     )
        else:
            self.device_collector = get_collector_by_kpi_id(
                kpi_id, self.kpi_manager_client, self.context_client, self.driver_instance_cache)
@@ -167,14 +167,8 @@ class TelemetryBackendService(GenericGrpcService):
            LOGGER.warning(f"KPI ID: {kpi_id} - Collector not found. Skipping...")
            raise Exception(f"KPI ID: {kpi_id} - Collector not found.")

        # CONFIRM: The method (get_subscription_parameters) is working correctly. testcase in telemetry backend tests
        # resource_to_subscribe = get_subscription_parameters(
        #     kpi_id, self.kpi_manager_client, self.context_client, resource, duration, interval
        # )
        # TODO: Remove after confirming get_subscription_parameters generic is working correctly
        resource_to_subscribe = get_mgon_subscription_parameters(
                collector['resource'], collector['endpoint'], collector['kpi'],
                collector['duration'], collector['sample_interval']
        resource_to_subscribe = get_subscription_parameters(
            kpi_id, self.kpi_manager_client, self.context_client, duration, interval
        )

        if not resource_to_subscribe:
@@ -193,25 +187,7 @@ class TelemetryBackendService(GenericGrpcService):
        for samples in self.device_collector.GetState(duration=duration, blocking=True):
            LOGGER.info(f"KPI ID: {kpi_id} - Samples: {samples}")
            if isinstance(samples, dict):
                inn_dict = samples.get('update', {})
                LOGGER.info(f"KPI ID: {kpi_id} - Inner Dictionary: {inn_dict}")
                list_update = inn_dict.get('update', [])
                LOGGER.info(f"KPI ID: {kpi_id} - List Update: {list_update}")
                if len(list_update) > 0:
                    sample_value = list_update[0].get('val')
                self.GenerateKpiValue(collector_id, kpi_id, sample_value)
            '''
{
    'update': {
        'timestamp': 1772103489806507669,
        'update': 
        [
            {'path': 'openconfig-wavelength-router:wavelength-router/flex-scale-mg-on:optical-bands/optical-band[index=2]/state/optical-power-total-input/instant', 'val': -2.51}
        ]
    }
 }
            '''
            # TODO: Stop_event should be managed in this method because there will be no more specific collector
            if stop_event.is_set():
                self.device_collector.Disconnect()

Loading