Commit c71a2a27 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Enhanced Telemetry backend service to reflact the changes made by the gNMI OC collector

- New helper method added
  + get_collector_by_kpi_id to select appropriate collector
  + get_subscription_parameters to generate the subscription

- Updated GenericCollectorHandler is introduced
  + Now this method will handle all type of collector without the need to seperate method for each collector.
parent 37cae078
Loading
Loading
Loading
Loading
+82 −1
Original line number Diff line number Diff line
@@ -12,18 +12,99 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import uuid
import logging
from .collector_api._Collector               import _Collector
from .collector_api.DriverInstanceCache      import get_driver
from common.proto.kpi_manager_pb2            import KpiId
from common.tools.context_queries.Device     import get_device
from common.tools.context_queries.EndPoint   import get_endpoint_names

LOGGER = logging.getLogger(__name__)

def get_subscription_parameters(
        kpi_id : str, kpi_manager_client, context_client, duration, interval
        ) -> list[tuple] | None:
    """
    Method to get subscription parameters based on KPI ID.
    Returns a list of tuples with subscription parameters.
    Each tuple contains:
        - Subscription ID (str)
        - Dictionary with:
            - "kpi" (str): KPI ID
            - "endpoint" (str): Endpoint name (e.g., 'eth0')
            - "resource" (str): Resource type (e.g., 'interface')
        - Sample interval (float)
        - Report interval (float)
    If the KPI ID is not found or the device is not available, returns None.
    Preconditions:
        - A KPI Descriptor must be added in KPI DB with correct device_id.
        - The device must be available in the context.
    """
    kpi_id_obj = KpiId()
    kpi_id_obj.kpi_id.uuid = kpi_id              # pyright: ignore[reportAttributeAccessIssue]
    kpi_descriptor = kpi_manager_client.GetKpiDescriptor(kpi_id_obj)
    if not kpi_descriptor:
        LOGGER.warning(f"KPI ID: {kpi_id} - Descriptor not found. Skipping...")
        return None

    kpi_sample_type = kpi_descriptor.kpi_sample_type
    LOGGER.info(f"KPI Descriptor (KPI Sample Type): {kpi_sample_type}")

    device = get_device( context_client       = context_client,
                         device_uuid          = kpi_descriptor.device_id.device_uuid.uuid,
                         include_config_rules = False,
                         include_components   = False
                         )
    if not device:
        raise Exception(f"KPI ID: {kpi_id} - Device not found for KPI descriptor.")
    endpoints = device.device_endpoints

    # LOGGER.info(f"Device for KPI ID: {kpi_id} - {endpoints}")
    # LOGGER.info(f"--------------------")
    endpointsIds = [endpoint_id.endpoint_id for endpoint_id in endpoints]
    for endpoint_id in endpoints:
        LOGGER.info(f"Endpoint UUID: {endpoint_id.endpoint_id}")
        
    # Getting endpoint names
    device_names, endpoint_data = get_endpoint_names(
        context_client = context_client,
        endpoint_ids   = endpointsIds
    )
    # LOGGER.info(f"Device names: {device_names}")
    # LOGGER.info(f"Endpoint data: {endpoint_data}")

    subscriptions = []
    sub_id = None
    for endpoint in endpointsIds:
        sub_id = str(uuid.uuid4())  # Generate a unique subscription ID
        LOGGER.info(f"Endpoint names only: {endpoint_data[endpoint.endpoint_uuid.uuid][0]}") 
        subscriptions.append(
            (
                sub_id,  # Example subscription ID
                {
                    "kpi"      : kpi_sample_type,   # As request is based on the single KPI so it should have only one endpoint
                    "endpoint" : endpoint_data[endpoint.endpoint_uuid.uuid][0],  # Endpoint name
                    "resource" : 'interface',  # Example resource type
                },
                float(duration),
                float(interval),
            )
        )
    return subscriptions


def get_collector_by_kpi_id(kpi_id: str, kpi_manager_client, context_client, driver_instance_cache
                            ) -> _Collector | None:
    """
    Method to get device_type and endpoint detail based on kpi_id.
    Method to get a collector instance based on KPI ID.
    Preconditions:
        - A KPI Descriptor must be added in KPI DB with correct device_id.
        - The device must be available in the context.
    Returns:
        - Collector instance if found, otherwise None.
    Raises:
        - Exception if the KPI ID is not found or the collector cannot be created.
    """
    LOGGER.info(f"Getting collector for KPI ID: {kpi_id}")
    kpi_id_obj = KpiId()
+53 −96
Original line number Diff line number Diff line
@@ -17,7 +17,7 @@ import time
import logging
import threading

from .HelperMethods   import get_collector_by_kpi_id
from .HelperMethods   import get_collector_by_kpi_id, get_subscription_parameters
from common.Constants import ServiceNameEnum
from common.Settings  import get_service_port_grpc
from confluent_kafka  import Consumer as KafkaConsumer
@@ -130,38 +130,56 @@ class TelemetryBackendService(GenericGrpcService):
        """
        Method to handle collector request.
        """
        device_type, collector, resource_key = self.get_subscription_detail_by_kpi_id(kpi_id)
        if device_type and "emu" in device_type:
            LOGGER.info("KPI ID: {:} - Device Type: {:} - Resource Key: {:}".format(kpi_id, device_type, resource_key))
            endpoints = self.get_endpoint_detail_by_kpi_id(kpi_id)
            subscription = [collector_id, endpoints , duration, interval]
            self.EmulatedCollectorHandler(subscription, duration, collector_id, kpi_id, stop_event)
        else:
            if collector:
                self.collector = collector
                self.collector.Connect()
                resources_to_subscribe = [(resource_key, duration, interval)]
                self.collector.SubscribeState(resources_to_subscribe)
                while not stop_event.is_set():
                        samples = list(self.collector.GetState(duration=duration, blocking=True))
                        LOGGER.info("KPI: {:} - Value: {:}".format(kpi_id, samples))
                        self.GenerateKpiValue(collector_id, kpi_id, samples)
                        time.sleep(1)
                self.collector.Disconnect()
        # CONFIRM: The method (get_collector_by_kpi_id) is working correctly. testcase in integration tests.
        self.device_collector = get_collector_by_kpi_id(
            kpi_id, self.kpi_manager_client, self.context_client, self.driver_instance_cache)

        if not self.device_collector:
            LOGGER.warning(f"KPI ID: {kpi_id} - Collector not found. Skipping...")
            raise Exception(f"KPI ID: {kpi_id} - Collector not found.")
        LOGGER.info(("----- Number done -----"))

        # CONFIRM: The method (get_subscription_parameters) is working correctly. testcase in telemetery backend tests.
        resource_to_subscribe = get_subscription_parameters(
            kpi_id, self.kpi_manager_client, self.context_client, duration, interval
        )
        if not resource_to_subscribe:
            LOGGER.warning(f"KPI ID: {kpi_id} - Resource to subscribe not found. Skipping...")
            raise Exception(f"KPI ID: {kpi_id} - Resource to subscribe not found.")
        LOGGER.info("------ Number done 2 -----")

    def EmulatedCollectorHandler(self, subscription, duration, collector_id, kpi_id, stop_event):
            # EmulatedCollector
            self.collector = EmulatedCollector(address="127.0.0.1", port=8000)
            self.collector.Connect()
            if not self.collector.SubscribeState(subscription):
                LOGGER.warning("KPI ID: {:} - Subscription failed. Skipping...".format(kpi_id))
        responses = self.device_collector.SubscribeState(resource_to_subscribe)

        for status in responses:
            if isinstance(status, Exception):
                LOGGER.error(f"Subscription failed for KPI ID: {kpi_id} - Error: {status}")
                raise status
            else:
                while not stop_event.is_set():
                    samples = list(self.collector.GetState(duration=duration, blocking=True))
                    LOGGER.info("KPI: {:} - Value: {:}".format(kpi_id, samples))
                LOGGER.info(f"Subscription successful for KPI ID: {kpi_id} - Status: {status}")
        LOGGER.info("------ Number done 3 -----")

        for samples in self.device_collector.GetState(duration=duration, blocking=True):
            LOGGER.info(f"KPI ID: {kpi_id} - Samples: {samples}")
            self.GenerateKpiValue(collector_id, kpi_id, samples)
                    time.sleep(1)
            self.collector.Disconnect()

        LOGGER.info("------ Number done 4 -----")
        self.device_collector.Disconnect()
        # TODO: Stop_event should be managed here is this method because there will be no more specific collector.

    
    # def EmulatedCollectorHandler(self, subscription, duration, collector_id, kpi_id, stop_event):
    #         # EmulatedCollector
    #         self.collector = EmulatedCollector(address="127.0.0.1", port=8000)
    #         self.collector.Connect()
    #         if not self.collector.SubscribeState(subscription):
    #             LOGGER.warning("KPI ID: {:} - Subscription failed. Skipping...".format(kpi_id))
    #         else:
    #             while not stop_event.is_set():
    #                 samples = list(self.collector.GetState(duration=duration, blocking=True))
    #                 LOGGER.info("KPI: {:} - Value: {:}".format(kpi_id, samples))
    #                 self.GenerateKpiValue(collector_id, kpi_id, samples)
    #                 time.sleep(1)
    #         self.collector.Disconnect()

    def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
        """
@@ -181,6 +199,10 @@ class TelemetryBackendService(GenericGrpcService):
        )
        producer.flush()

    def delivery_callback(self, err, msg):
        if err: 
            LOGGER.error('Message delivery failed: {:s}'.format(str(err)))

    def TerminateCollector(self, job_id):
        LOGGER.debug("Terminating collector backend...")
        try:
@@ -202,68 +224,3 @@ class TelemetryBackendService(GenericGrpcService):
        except:
            LOGGER.exception("Error terminating job: {:}".format(job_id))
    def get_subscription_detail_by_kpi_id(self, kpi_id: str):
        """
        Method to get device_type and endpoint detail based on device_uuid.
        """
        kpi_id_obj = KpiId()
        kpi_id_obj.kpi_id.uuid = kpi_id
        kpi_descriptor = self.kpi_manager_client.GetKpiDescriptor(kpi_id_obj)
        if not kpi_descriptor:
            LOGGER.warning(f"KPI ID: {kpi_id} - Descriptor not found. Skipping...")
            return (None, None, None)

        device_uuid       = kpi_descriptor.device_id.device_uuid.uuid
        endpoint_uuid     = kpi_descriptor.endpoint_id.endpoint_uuid.uuid
        kpi_sample_type   = kpi_descriptor.kpi_sample_type

        device = get_device( context_client       = self.context_client,
                             device_uuid          = device_uuid,
                             include_config_rules = False,
                             include_components   = False,
                             )
        # Getting device collector (testing)
        collector : _Collector = get_driver(self.driver_instance_cache, device)
        # if device:
        #     monitoring_loops = MonitoringLoops()
        #     resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_uuid, kpi_sample_type)
        #     return (device.device_type, collector, resource_key)
        LOGGER.warning(f"Device not found: {device_uuid}")
        return (None, None, None)

    def get_endpoint_detail_by_kpi_id(self, kpi_id: str):
        """
        Method to get device_type and endpoint detail based on device_uuid.
        """
        kpi_id_obj = KpiId()
        kpi_id_obj.kpi_id.uuid = kpi_id
        kpi_descriptor = self.kpi_manager_client.GetKpiDescriptor(kpi_id_obj)
        if not kpi_descriptor:
            LOGGER.warning(f"KPI ID: {kpi_id} - Descriptor not found. Skipping...")
            return None

        device_id   = kpi_descriptor.device_id.device_uuid.uuid
        endpoint_id = kpi_descriptor.endpoint_id.endpoint_uuid.uuid
        device = get_device( context_client       = self.context_client,
                             device_uuid          = device_id,
                             include_config_rules = False,
                             include_components   = False,
                             )
        if device:
            for endpoint in device.device_endpoints:
                if endpoint.endpoint_id.endpoint_uuid.uuid == endpoint_id:
                    endpoint_dict         = {}
                    kpi_sample_types      = []
                    endpoint_dict["uuid"] = endpoint.endpoint_id.endpoint_uuid.uuid
                    endpoint_dict["name"] = endpoint.name
                    endpoint_dict["type"] = endpoint.endpoint_type
                    for sample_type in endpoint.kpi_sample_types:
                        kpi_sample_types.append(sample_type)
                    endpoint_dict["sample_types"] = kpi_sample_types
                    return endpoint_dict
        LOGGER.warning(f"Device ID: {device_id} - Endpoint ID: {endpoint_id} - Not Found")
        return None

    def delivery_callback(self, err, msg):
        if err: 
            LOGGER.error('Message delivery failed: {:s}'.format(str(err)))