Commit f7d0faea authored by Georgios P. Katsikas's avatar Georgios P. Katsikas
Browse files

fix: telemetry backend and INT collector fixes

parent e9ab7c48
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -17,11 +17,11 @@
apiVersion: metallb.io/v1beta1
kind: IPAddressPool
metadata:
  name: my-ip-pool
  name: "metallb-address-pool"
  namespace: metallb-system
spec:
  addresses:
    - 192.168.5.250-192.168.5.251  # <-- Change this to match your network
    - 10.10.10.41-10.10.10.42
---
apiVersion: metallb.io/v1beta1
kind: L2Advertisement
+61 −16
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@ import uuid
import logging
from .collector_api._Collector               import _Collector
from .collector_api.DriverInstanceCache      import get_driver
from .collectors.int_collector.INTCollector  import INTCollector
from common.proto.kpi_manager_pb2            import KpiId
from common.tools.context_queries.Device     import get_device
from common.tools.context_queries.EndPoint   import get_endpoint_names
@@ -61,19 +62,18 @@ def get_subscription_parameters(
        raise Exception(f"KPI ID: {kpi_id} - Device not found for KPI descriptor.")
    endpoints = device.device_endpoints

    # LOGGER.info(f"Device for KPI ID: {kpi_id} - {endpoints}")
    # LOGGER.info(f"--------------------")
    LOGGER.debug(f"Device for KPI ID: {kpi_id} - {endpoints}")
    endpointsIds = [endpoint_id.endpoint_id for endpoint_id in endpoints]
    # for endpoint_id in endpoints:
    #     LOGGER.info(f"Endpoint UUID: {endpoint_id.endpoint_id}")
    for endpoint_id in endpoints:
        LOGGER.debug(f"Endpoint UUID: {endpoint_id.endpoint_id}")
        
    # Getting endpoint names
    device_names, endpoint_data = get_endpoint_names(
        context_client = context_client,
        endpoint_ids   = endpointsIds
    )
    # LOGGER.info(f"Device names: {device_names}")
    # LOGGER.info(f"Endpoint data: {endpoint_data}")
    LOGGER.debug(f"Device names: {device_names}")
    LOGGER.debug(f"Endpoint data: {endpoint_data}")

    subscriptions = []
    sub_id = None
@@ -94,7 +94,6 @@ def get_subscription_parameters(
        )
    return subscriptions


def get_collector_by_kpi_id(kpi_id: str, kpi_manager_client, context_client, driver_instance_cache
                            ) -> Optional[_Collector]:
    """
@@ -111,13 +110,13 @@ def get_collector_by_kpi_id(kpi_id: str, kpi_manager_client, context_client, dri
    kpi_id_obj = KpiId()
    kpi_id_obj.kpi_id.uuid = kpi_id              # pyright: ignore[reportAttributeAccessIssue]
    kpi_descriptor = kpi_manager_client.GetKpiDescriptor(kpi_id_obj)
    # LOGGER.info(f"KPI Descriptor: {kpi_descriptor}")
    if not kpi_descriptor:
        raise Exception(f"KPI ID: {kpi_id} - Descriptor not found.")
    
    # device_uuid       = kpi_descriptor.device_id.device_uuid.uuid
    device = get_device( context_client       = context_client,
                         device_uuid          = kpi_descriptor.device_id.device_uuid.uuid,
    device_uuid = kpi_descriptor.device_id.device_uuid.uuid
    device = get_device(
        context_client       = context_client,
        device_uuid          = device_uuid,
        include_config_rules = True,
        include_components   = False,
    )
@@ -126,5 +125,51 @@ def get_collector_by_kpi_id(kpi_id: str, kpi_manager_client, context_client, dri
    collector : _Collector = get_driver(driver_instance_cache, device)
    if collector is None:
        raise Exception(f"KPI ID: {kpi_id} - Collector not found for device {device.device_uuid.uuid}.")
    # LOGGER.info(f"Collector for KPI ID: {kpi_id} - {collector.__class__.__name__}")
    return collector

def get_node_level_int_collector(collector_id: str, kpi_id: str, address: str, interface: str, port: int,
            service_id: str, context_id: str) -> Optional[_Collector]:
    """
    Method to instantiate an in-band network telemetry collector at a node level.
    Such a collector binds to a physical/virtual interface of a node, expecting
    packets from one or more switches.
    Every packet contains multiple KPIs, therefore this collector is not bound to
    a single KPI.
    Returns:
        - Collector instance if found, otherwise None.
    Raises:
        - Exception if the KPI ID is not found or the collector cannot be created.
    """

    LOGGER.debug(f"INT collector         ID: {collector_id}")
    LOGGER.debug(f"INT collector    address: {address}")
    LOGGER.debug(f"INT collector       port: {port}")
    LOGGER.debug(f"INT collector  interface: {interface}")
    LOGGER.debug(f"INT collector     kpi_id: {kpi_id}")
    LOGGER.debug(f"INT collector service_id: {service_id}")
    LOGGER.debug(f"INT collector context_id: {context_id}")
    # Initialize an INT collector
    try:
        collector : _Collector = INTCollector(
            address=address,
            port=port,
            collector_id=collector_id,
            interface=interface,
            kpi_id=kpi_id,
            service_id=service_id,
            context_id=context_id
        )
    except Exception as ex:
        LOGGER.exception(f"Failed to create INT Collector object on node {address}, {interface}:{port}")

    connected = False
    if not collector:
        return None
    LOGGER.info(f"Collector for KPI ID: {kpi_id} - {collector.__class__.__name__}")

    try:
        connected = collector.Connect()
    except Exception as ex:
        LOGGER.exception(f"Failed to connect INT Collector on node {address}, {interface}:{port}")

    return collector if connected else None
+44 −56
Original line number Diff line number Diff line
@@ -17,7 +17,7 @@ import time
import logging
import threading

from .HelperMethods   import get_collector_by_kpi_id, get_subscription_parameters
from .HelperMethods   import get_collector_by_kpi_id, get_subscription_parameters, get_node_level_int_collector
from common.Constants import ServiceNameEnum
from common.Settings  import get_service_port_grpc
from confluent_kafka  import Consumer as KafkaConsumer
@@ -26,12 +26,8 @@ from confluent_kafka import Producer as KafkaProducer
from datetime         import datetime, timezone
from typing           import Any, Dict

from .collector_api._Collector               import _Collector
from .collector_api.DriverInstanceCache      import DriverInstanceCache, get_driver
from .collectors.emulated.EmulatedCollector  import EmulatedCollector
from .collector_api.DriverInstanceCache      import DriverInstanceCache
from common.method_wrappers.Decorator        import MetricsPool
from common.proto.kpi_manager_pb2            import KpiId
from common.tools.context_queries.Device     import get_device
from common.tools.kafka.Variables            import KafkaConfig, KafkaTopic
from common.tools.service.GenericGrpcService import GenericGrpcService
from context.client.ContextClient            import ContextClient
@@ -56,7 +52,6 @@ class TelemetryBackendService(GenericGrpcService):
                                            'auto.offset.reset'  : 'latest'})
        self.driver_instance_cache = driver_instance_cache
        self.device_collector      = None
        self.collector             = None           # This should be replaced with device_collector (later to be removed)
        self.context_client        = ContextClient()
        self.kpi_manager_client    = KpiManagerClient()
        self.active_jobs = {}
@@ -84,18 +79,18 @@ class TelemetryBackendService(GenericGrpcService):
                        f"Subscribed topic {receive_msg.topic()} does not exist or topic does not have any messages.")
                    continue
                else:
                    LOGGER.error("Consumer error: {}".format(receive_msg.error()))
                    LOGGER.error(f"Consumer error: {receive_msg.error()}")
                    break
            try: 
                collector = json.loads(receive_msg.value().decode('utf-8'))
                collector_id = receive_msg.key().decode('utf-8')
                LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector))
                LOGGER.debug(f"Received Collector: {collector_id} - {collector}")

                duration = collector.get('duration', 0)
                if duration == -1 and collector['interval'] == -1:
                    self.TerminateCollector(collector_id)
                else:
                    LOGGER.info("Received Collector ID: {:} - Scheduling...".format(collector_id))
                    LOGGER.info(f"Received Collector ID: {collector_id} - Scheduling...")
                    if collector_id not in self.active_jobs:
                        stop_event = threading.Event()
                        self.active_jobs[collector_id] = stop_event
@@ -117,7 +112,7 @@ class TelemetryBackendService(GenericGrpcService):
                                time.sleep(completion_time)
                                if not stop_event.is_set():
                                    LOGGER.warning(
                                        f"Execution duration ({completion_time}) completed of Collector: {collector_id}")
                                        f"Execution duration ({completion_time}) completed for Collector: {collector_id}")
                                    self.TerminateCollector(collector_id)

                            duration_thread = threading.Thread(
@@ -126,66 +121,60 @@ class TelemetryBackendService(GenericGrpcService):
                            )
                            duration_thread.start()
                    else:
                        LOGGER.warning("Collector ID: {:} - Already scheduled or running".format(collector_id))
                        LOGGER.warning(f"Collector ID: {collector_id} - Already scheduled or running")
            except Exception as e:
                LOGGER.warning(
                    "Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e))
                    f"Unable to consume message from topic: {KafkaTopic.TELEMETRY_REQUEST.value}. ERROR: {e}")

    def GenericCollectorHandler(self, collector_id, kpi_id, duration, interval, interface, port, service_id, context_id, stop_event):
        """
        Method to handle collector request.
        """
        # CONFIRM: The method (get_collector_by_kpi_id) is working correctly. testcase in integration tests.

        # INT collector invocation
        if interface:
            self.device_collector = get_node_level_int_collector(
                collector_id=collector_id,
                kpi_id=kpi_id,
                address="127.0.0.1",
                interface=interface,
                port=port,
                service_id=service_id,
                context_id=context_id
            )
            return
        # Rest of the collectors
        else:
            self.device_collector = get_collector_by_kpi_id(
                kpi_id, self.kpi_manager_client, self.context_client, self.driver_instance_cache)

        if not self.device_collector:
            LOGGER.warning(f"KPI ID: {kpi_id} - Collector not found. Skipping...")
            raise Exception(f"KPI ID: {kpi_id} - Collector not found.")
        LOGGER.info(("----- Number done -----"))

        # CONFIRM: The method (get_subscription_parameters) is working correctly. testcase in telemetery backend tests.
        # CONFIRM: The method (get_subscription_parameters) is working correctly. testcase in telemetry backend tests
        resource_to_subscribe = get_subscription_parameters(
            kpi_id, self.kpi_manager_client, self.context_client, duration, interval
        )
        if not resource_to_subscribe:
            LOGGER.warning(f"KPI ID: {kpi_id} - Resource to subscribe not found. Skipping...")
            raise Exception(f"KPI ID: {kpi_id} - Resource to subscribe not found.")
        LOGGER.info("------ Number done 2 -----")

        responses = self.device_collector.SubscribeState(resource_to_subscribe)

        for status in responses:
            if isinstance(status, Exception):
                LOGGER.error(f"Subscription failed for KPI ID: {kpi_id} - Error: {status}")
                raise status
            else:
                LOGGER.info(f"Subscription successful for KPI ID: {kpi_id} - Status: {status}")
        LOGGER.info("------ Number done 3 -----")

        for samples in self.device_collector.GetState(duration=duration, blocking=True):
            LOGGER.info(f"KPI ID: {kpi_id} - Samples: {samples}")
            self.GenerateKpiValue(collector_id, kpi_id, samples)

        LOGGER.info("------ Number done 4 -----")
        self.device_collector.Disconnect()
        # TODO: Stop_event should be managed here is this method because there will be no more specific collector.

# --- START: Kept for INT compatibility, to be removed later ---
    def INTCollectorHandler(self, subscription, duration, collector_id, kpi_id, interface, port, service_id, context_id , stop_event):
        self.collector = INTCollector(
            collector_id=collector_id ,
            address="127.0.0.1",
            interface=interface,
            port=port,
            kpi_id=kpi_id,
            service_id=service_id,
            context_id=context_id
        )
        self.collector.Connect()
        # TODO: Stop_event should be managed in this method because there will be no more specific collector
        if stop_event.is_set():
            self.collector.Disconnect()
# --- END: Kept for INT compatibility, to be removed later ---
            self.device_collector.Disconnect()

    def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
        """
@@ -207,25 +196,24 @@ class TelemetryBackendService(GenericGrpcService):

    def delivery_callback(self, err, msg):
        if err: 
            LOGGER.error('Message delivery failed: {:s}'.format(str(err)))
            LOGGER.error(f"Message delivery failed: {str(err)}")

    def TerminateCollector(self, job_id):
        LOGGER.debug("Terminating collector backend...")
        try:
            if job_id not in self.active_jobs:
                self.logger.warning(f"No active jobs found for {job_id}. It might have already terminated.")
                self.logger.warning(f"No active jobs found for {job_id}. It might have already been terminated.")
            else:
                LOGGER.info(f"Terminating job: {job_id}")
                stop_event = self.active_jobs.pop(job_id, None)
                if stop_event:
                    stop_event.set()
                    LOGGER.info(f"Job {job_id} terminated.")
                    if self.collector.UnsubscribeState(job_id):
                    if self.device_collector.UnsubscribeState(job_id):
                        LOGGER.info(f"Unsubscribed from collector: {job_id}")
                    else:
                        LOGGER.warning(f"Failed to unsubscribe from collector: {job_id}")
                else:
                    LOGGER.warning(f"Job {job_id} not found in active jobs.")
        except:
            LOGGER.exception("Error terminating job: {:}".format(job_id))
            LOGGER.exception(f"Error terminating job: {job_id}")
+1 −3
Original line number Diff line number Diff line
@@ -34,8 +34,6 @@ COLLECTORS.append(
        },
    ]))

# TODO: Import for gNMI OpenConfig Collector ...

if LOAD_ALL_DEVICE_DRIVERS:
    from .gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector # pylint: disable=wrong-import-position
    COLLECTORS.append(
@@ -49,7 +47,7 @@ if LOAD_ALL_DEVICE_DRIVERS:
    )

if LOAD_ALL_DEVICE_DRIVERS:
    from .intcollector.INTCollector import INTCollector # pylint: disable=wrong-import-position
    from .int_collector.INTCollector import INTCollector # pylint: disable=wrong-import-position
    COLLECTORS.append(
        (INTCollector, [
            {
+179 −86

File changed and moved.

Preview size limit exceeded, changes collapsed.

Loading