Commit 46e5f9de authored by Andrea Sgambelluri's avatar Andrea Sgambelluri
Browse files

telemetry fix

parent 7b0bc3cb
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -136,7 +136,7 @@ class TelemetryBackendService(GenericGrpcService):
        """
        Method to handle collector request.
        """

        LOGGER.info(f"Starting Collector Handler for Collector ID: {collector_id} - KPI ID: {kpi_id}")
        # INT collector invocation
        if interface:
            self.device_collector = get_node_level_int_collector(
+6 −9
Original line number Diff line number Diff line


import uuid
import json
from common.proto import kpi_manager_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from src.telemetry.backend.service.collectors.gnmi_oc.KPI import KPI
@@ -17,7 +17,7 @@ def create_kpi_descriptor_request(descriptor_name: str = "Test_name"):
    _create_kpi_request.device_id.device_uuid.uuid         = "ddb3ef8e-ee65-5cf9-9d21-dac56a27f85b"         # confirm for TFS
    _create_kpi_request.service_id.service_uuid.uuid       = "b2a60c5b-8c46-5707-a64a-9c6539d395f2"
    # _create_kpi_request.slice_id.slice_uuid.uuid           = 'SLC1'
    # _create_kpi_request.endpoint_id.endpoint_uuid.uuid     = str(uuid.uuid4())
    _create_kpi_request.endpoint_id.endpoint_uuid.uuid     = "<>"
    # _create_kpi_request.connection_id.connection_uuid.uuid = 'CON1' 
    # _create_kpi_request.link_id.link_uuid.uuid             = 'LNK1' 
    return _create_kpi_request
@@ -35,7 +35,7 @@ devices = {
        'kpi'        : KPI.KPISAMPLETYPE_OPTICAL_TOTAL_INPUT_POWER,
        #'resource': 'oc-wave-router:wavelength-router/fsmgon:optical-bands/optical-band[index=4]/state/optical-power-total-input/instant',
        'resource'   : 'wavelength-router',            #TODO: verify resource name form mg-on model
        'endpoint'   : '4',
        'endpoint'   : '1',
        'skip_verify': True
    },
}
@@ -44,9 +44,6 @@ def create_basic_sub_request_parameters() -> dict:

    device = devices['mgon']
    if device:
        kpi      = device['kpi']
        resource = device['resource']
        endpoint = device['endpoint']
        return {
            'host'              : device['host'],
            'port'              : device['port'],
@@ -57,9 +54,9 @@ def create_basic_sub_request_parameters() -> dict:
            'mode'              : 'sample',             # Subscription internal mode posibly: on_change, poll, sample
            'sample_interval'   : 10,                   # This should be in seconds units
            'duration'          : 300.0,                # Duration in seconds for how long to receive samples
            'kpi'               : kpi,
            'resource'          : resource,
            'endpoint'          : endpoint,
            'kpi'               : device['kpi'],
            'resource'          : device['resource'],
            'endpoint'          : device['endpoint'],
        }
    return {}

+53 −21
Original line number Diff line number Diff line


import logging

import time
from common.proto.kpi_manager_pb2 import KpiId
from common.proto.telemetry_frontend_pb2 import CollectorId
import time
import threading

from common.proto import kpi_manager_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType
@@ -12,6 +13,9 @@ from common.proto.kpi_sample_types_pb2 import KpiSampleType
from tests.ofc26_flexscale.test_ofc26_messages import create_kpi_descriptor_request, create_collector_request
from src.tests.ofc26_flexscale.test_ofc26_messages import create_basic_sub_request_parameters

from src.telemetry.backend.service.TelemetryBackendService import TelemetryBackendService


WITH_TFS = True     #True/False
if WITH_TFS:
    from .Fixtures import kpi_manager_client, telemetry_frontend_client
@@ -63,26 +67,52 @@ def test_Complete_MGON_Integration(kpi_manager_client, telemetry_frontend_client

    # 2. Telemetry Collector Creation
    
    # _collector_request = create_collector_request()
    # _search_collector_id = CollectorId()
    # _search_collector_id = _collector_request.collector_id
    # try:
    #     response_col = telemetry_frontend_client.StopCollector(_search_collector_id)
    #     LOGGER.info("Response gRPC message object: {:}".format(response_col))
    #     if response is not None:
    #         response = telemetry_frontend_client.StartCollector(_collector_request)
    #         LOGGER.info("Response gRPC message object: {:}".format(response))
    #         assert isinstance(response, CollectorId)
    # except Exception as e:
    #     LOGGER.info("Error finding the collector with ID: %s. Proceeding to create it.", _search_collector_id.collector_id.uuid)
    #     response = telemetry_frontend_client.StartCollector(_collector_request)
    #     LOGGER.info("Response gRPC message object: {:}".format(response))
    #     assert isinstance(response, CollectorId)
    
    # step 2: Telemetry Collector backup option
    _collector_request = create_collector_request()
    _search_collector_id = CollectorId()
    _search_collector_id = _collector_request.collector_id
    try:
        response_col = telemetry_frontend_client.StopCollector(_search_collector_id)
        LOGGER.info("Response gRPC message object: {:}".format(response_col))
        if response is not None:
            response = telemetry_frontend_client.StartCollector(_collector_request)
            LOGGER.info("Response gRPC message object: {:}".format(response))
            assert isinstance(response, CollectorId)
    except Exception as e:
        LOGGER.info("Error finding the collector with ID: %s. Proceeding to create it.", _search_collector_id.collector_id.uuid)
        response = telemetry_frontend_client.StartCollector(_collector_request)
        LOGGER.info("Response gRPC message object: {:}".format(response))
        assert isinstance(response, CollectorId)
    
    # sub_parameters = create_basic_sub_request_parameters()
    # LOGGER.info("Subscription parameters: %s", sub_parameters)


    _collector         = create_basic_sub_request_parameters()
    _coll_id           = "mgon_collector_id"
    LOGGER.info("Subscription for collector %s parameters: %s", _coll_id, _collector)
    
    _duration          = _collector_request.duration_s
    _interval          = _collector_request.interval_s
    
    stop_event       = threading.Event()
    collector_thread = threading.Thread(
        target=TelemetryBackendService.GenericCollectorHandler,
        args=(
            _coll_id, _collector, None, _duration, _interval,
            None, None, None, None, stop_event
        )
    )

    def stop_after_duration(completion_time, stop_event):
        time.sleep(completion_time)
        if not stop_event.is_set():
            LOGGER.warning(f"Execution duration ({completion_time}) completed for Collector: {_coll_id}")
            if stop_event:
                stop_event.set()

    duration_thread = threading.Thread(
        target=stop_after_duration, daemon=True, name=f"stop_after_duration_{_coll_id}",
        args=(_duration, stop_event)
    )
    duration_thread.start()
    # LOGGER.info("----- Testing State Subscription -----")
    
    # sub_data = [(
@@ -103,6 +133,8 @@ def test_Complete_MGON_Integration(kpi_manager_client, telemetry_frontend_client

    LOGGER.info("Sleeping...")
    time.sleep(600)
    if stop_event:
        stop_event.set()
    LOGGER.info("Done sleeping.")
    LOGGER.info(" >>> test_Complete_MGON_Integration: END <<< ")

+19 −0
Original line number Diff line number Diff line
{
    "contexts": [
        {
            "context_id": {"context_uuid": {"uuid": "admin"}},
            "topology_ids": [],
            "service_ids": []
        }
    ],
    "topologies": [
        {
            "topology_id": {
                "context_id": {"context_uuid": {"uuid": "admin"}},
                "topology_uuid": {"uuid": "admin"}
            },
            "device_ids": [],
            "link_ids": []
        }
    ]
}
+1378 −0

File added.

Preview size limit exceeded, changes collapsed.

Loading