Commit f9655edb authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Telemetry Collector lifecycle is added.

- Minor changes in telemetry backend service.
- test file is updated to generate kafka request.
- telemtery consumes the request, start the collector and stops the collector when duration completed.
parent 425ca462
Loading
Loading
Loading
Loading
+4 −4
Original line number Diff line number Diff line
@@ -63,8 +63,8 @@ def get_subscription_parameters(
    # LOGGER.info(f"Device for KPI ID: {kpi_id} - {endpoints}")
    # LOGGER.info(f"--------------------")
    endpointsIds = [endpoint_id.endpoint_id for endpoint_id in endpoints]
    for endpoint_id in endpoints:
        LOGGER.info(f"Endpoint UUID: {endpoint_id.endpoint_id}")
    # for endpoint_id in endpoints:
    #     LOGGER.info(f"Endpoint UUID: {endpoint_id.endpoint_id}")
        
    # Getting endpoint names
    device_names, endpoint_data = get_endpoint_names(
@@ -110,7 +110,7 @@ def get_collector_by_kpi_id(kpi_id: str, kpi_manager_client, context_client, dri
    kpi_id_obj = KpiId()
    kpi_id_obj.kpi_id.uuid = kpi_id              # pyright: ignore[reportAttributeAccessIssue]
    kpi_descriptor     = kpi_manager_client.GetKpiDescriptor(kpi_id_obj)
    LOGGER.info(f"KPI Descriptor: {kpi_descriptor}")
    # LOGGER.info(f"KPI Descriptor: {kpi_descriptor}")
    if not kpi_descriptor:
        raise Exception(f"KPI ID: {kpi_id} - Descriptor not found.")
    
@@ -125,5 +125,5 @@ def get_collector_by_kpi_id(kpi_id: str, kpi_manager_client, context_client, dri
    collector : _Collector = get_driver(driver_instance_cache, device)
    if collector is None:
        raise Exception(f"KPI ID: {kpi_id} - Collector not found for device {device.device_uuid.uuid}.")
    LOGGER.info(f"Collector for KPI ID: {kpi_id} - {collector.__class__.__name__}")
    # LOGGER.info(f"Collector for KPI ID: {kpi_id} - {collector.__class__.__name__}")
    return collector
+2 −4
Original line number Diff line number Diff line
@@ -86,9 +86,7 @@ class TelemetryBackendService(GenericGrpcService):
                    LOGGER.error("Consumer error: {}".format(receive_msg.error()))
                    break
            try: 
                collector = json.loads(
                    receive_msg.value().decode('utf-8')
                )
                collector = json.loads(receive_msg.value().decode('utf-8'))
                collector_id = receive_msg.key().decode('utf-8')
                LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector))

@@ -113,7 +111,7 @@ class TelemetryBackendService(GenericGrpcService):
                            def stop_after_duration(completion_time, stop_event):
                                time.sleep(completion_time)
                                if not stop_event.is_set():
                                    LOGGER.warning(f"Execution duration ({completion_time}) completed of Collector: {collector_id}")
                                    LOGGER.info(f"Execution duration ({completion_time}) completed of Collector: {collector_id}")
                                    self.TerminateCollector(collector_id)
                                
                            duration_thread = threading.Thread(
+50 −4
Original line number Diff line number Diff line
@@ -15,6 +15,8 @@
import logging
import pytest
import time
from typing import Dict
import json
# from common.tools.context_queries.EndPoint import get_endpoint_names
#from .EndPoint import get_endpoint_names        # modofied version of get_endpoint_names
from common.tools.kafka.Variables import KafkaTopic
@@ -34,13 +36,32 @@ from telemetry.backend.service.collector_api.DriverFactory import DriverFactory
from telemetry.backend.service.collector_api.DriverInstanceCache import DriverInstanceCache, preload_drivers
from telemetry.backend.service.collectors import COLLECTORS
from telemetry.backend.service.collectors.gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector


from confluent_kafka import Producer as KafkaProducer
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)


# --------------------------------------------------------------
# -------------------- HELPER METHODS --------------------------
# --------------------------------------------------------------

def publish_request_on_kafka(collector_obj, collector_uuid):
    kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
    kafka_producer.produce(
            KafkaTopic.TELEMETRY_REQUEST.value,
            key      = collector_uuid,
            value    = json.dumps(collector_obj),
            callback = delivery_callback
        )
    LOGGER.info("Collector Request Generated: Collector Id: {:}, Value: {:}".format(collector_uuid, collector_obj))
    kafka_producer.flush()

def delivery_callback(err, msg):
    if err:
        LOGGER.warning('Message delivery failed: {:}'.format(err))

# --------------------------------------------------------------
# -------------------- FIXTURES --------------------------------
# --------------------------------------------------------------
@@ -67,8 +88,8 @@ def telemetry_backend_service():
    _service              = TelemetryBackendService(driver_instance_cache)
    _service.start()

    LOGGER.info('Preloading collectors...')
    preload_drivers(driver_instance_cache)
    # LOGGER.info('Preloading collectors...')
    # preload_drivers(driver_instance_cache)

    LOGGER.info('Yielding TelemetryBackendService...')
    yield _service
@@ -82,6 +103,30 @@ def telemetry_backend_service():
# -------------------- TESTS -----------------------------------
# --------------------------------------------------------------

# TODO: To test complete cycle of collector creation, subscription, and data retrieval, and termination by unsubscribing.
# ----- overall collector lifecycle test -----
# 1. Start telemetry backend service
# 2. Generate request on Kafka

# def test_collector_lifecycle(
#         telemetry_backend_service, kpi_manager_client, context_client
#         ):
#     time.sleep(5)  # Wait for the service to start
#     LOGGER.info('------ Waiting time for telemetry backend service is finished ------')
#     # Create a collector request
#     collector_obj : Dict = {
#         "kpi_id": "6e22f180-ba28-4641-b190-2287bf447777",
#         "duration": 10.0,
#         "interval": 3.0
#     }
#     collector_id = "6e22f180-ba28-4641-b190-2287bf444444"
#     publish_request_on_kafka(collector_obj, collector_id)
#     # Wait for the collector to be created and started
#     LOGGER.info('Waiting for collector to be created and started...')
#     time.sleep(30)  # Adjust the sleep time as needed for your environment

#     LOGGER.info('Test terminated.')

# The following conditions to completly test this method (not for Gitlab CI/CD)
# 1. A KPI Descriptor must be added in KPI DB with correct device_id. (gNMI OpenConfig driver)
# 2. A collector must be created for the KPI ID.
@@ -98,6 +143,7 @@ def telemetry_backend_service():
#         stop_event=""
#     )


# def test_helper_get_subscription_parameters(kpi_manager_client, context_client):
#     """
#     This test validates the correct retrieval of subscription parameters based on a KPI ID.
+2 −2
Original line number Diff line number Diff line
@@ -52,7 +52,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
        LOGGER.info ("gRPC message: {:}".format(request))
        response = CollectorId()

        # TODO: Verify the presence of Kpi ID in KpiDB or assume that KPI ID already exists?
        # INFO: If kpi_uuid is already in the DB, it will raise an exception.
        self.tele_db_obj.add_row_to_db(
            CollectorModel.ConvertCollectorToRow(request)
        )
@@ -133,7 +133,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
        except Exception as e:
            LOGGER.info('Unable to apply filter on kpi descriptor. {:}'.format(e))
        try:
            for row in rows:
            for row in rows:    # TODO: Tackle this warning.
                collector_obj = CollectorModel.ConvertRowToCollector(row)
                response.collector_list.append(collector_obj)
            return response