Commit 7b0bc3cb authored by Andrea Sgambelluri's avatar Andrea Sgambelluri
Browse files

fix for dynamic telemetry activation

parent eb80a6da
Loading
Loading
Loading
Loading
+9 −9
Original line number Diff line number Diff line
@@ -29,7 +29,7 @@ from .collectors.gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector
LOGGER = logging.getLogger(__name__)

def get_subscription_parameters(
        kpi_id : str, kpi_manager_client, context_client, duration, interval
        kpi_id : str, kpi_manager_client, context_client, resource, duration, interval
        ) -> Optional[List[Tuple]]:
    """
    Method to get subscription parameters based on KPI ID.
@@ -90,7 +90,7 @@ def get_subscription_parameters(
                {
                    "kpi"      : kpi_sample_type,   # As request is based on the single KPI so it should have only one endpoint
                    "endpoint" : endpoint_data[endpoint.endpoint_uuid.uuid][0],  # Endpoint name
                    "resource" : 'interface',  # Example resource type
                    "resource" : resource,          # Example resource type is 'interface' or 'wavelength-router' for MG-ON, this should be defined in the KPI Descriptor or as part of the request
                },
                float(duration),
                float(interval),
@@ -179,13 +179,13 @@ def get_node_level_int_collector(collector_id: str, kpi_id: str, address: str, i
    return collector if connected else None


def get_mgon_subscription_parameters(duration, interval) -> Optional[List[Tuple]]:
def get_mgon_subscription_parameters(resource: str, endpoint: str, kpi: str, duration: int, interval: int) -> Optional[List[Tuple]]:
        return [(
                "x123",
                str(uuid.uuid4()), # "x123",
                {
                    "kpi"      : KPI.KPISAMPLETYPE_OPTICAL_TOTAL_INPUT_POWER, # sub_parameters['kpi'],
                    "endpoint" : '4', # sub_parameters['endpoint'],
                    "resource" : 'wavelength-router', #sub_parameters['resource'],
                    "kpi"      : kpi,            # sub_parameters['kpi'],
                    "endpoint" : endpoint,       # sub_parameters['endpoint'],
                    "resource" : resource,       #sub_parameters['resource'],
                },
                duration,
                interval,
+20 −10
Original line number Diff line number Diff line
@@ -97,6 +97,10 @@ class TelemetryBackendService(GenericGrpcService):
                        threading.Thread(target=self.GenericCollectorHandler,
                                        args=(
                                            collector_id,
                                            collector,      # TODO: later all other collector['<key>'] should be removed.
                                                            # For now, to avoid multiple changes in the code,
                                                            # I am passing the whole collector dict and accessing the required parameters in the GenericCollectorHandler method.
                                                            # This will be changed after confirming the current implementation is working fine.
                                            collector['kpi_id'],
                                            duration,
                                            collector['interval'],
@@ -126,7 +130,9 @@ class TelemetryBackendService(GenericGrpcService):
                LOGGER.warning(
                    f"Unable to consume message from topic: {KafkaTopic.TELEMETRY_REQUEST.value}. ERROR: {e}")

    def GenericCollectorHandler(self, collector_id, kpi_id, duration, interval, interface, port, service_id, context_id, stop_event):
    def GenericCollectorHandler(self, 
                                collector_id, collector, kpi_id, duration, interval, interface, port,
                                service_id, context_id, stop_event):
        """
        Method to handle collector request.
        """
@@ -146,12 +152,12 @@ class TelemetryBackendService(GenericGrpcService):
        # Rest of the collectors
        elif context_id == "43813baf-195e-5da6-af20-b3d0922e71a7":
            self.device_collector = get_mgon_collector(
                address     = "172.17.254.24",
                port        =  50061,
                username    = "admin",
                password    = "admin",
                insecure    = True,
                skip_verify = True
                address     = collector['host'],            # "172.17.254.24",
                port        = collector['port'],            # 50061,
                username    = collector['username'],        # "admin",
                password    = collector['password'],        # "admin",
                insecure    = collector.get('insecure', True),
                skip_verify = collector.get('skip_verify', True)
            )
        else:
            self.device_collector = get_collector_by_kpi_id(
@@ -163,9 +169,13 @@ class TelemetryBackendService(GenericGrpcService):

        # CONFIRM: The method (get_subscription_parameters) is working correctly. testcase in telemetry backend tests
        # resource_to_subscribe = get_subscription_parameters(
        #     kpi_id, self.kpi_manager_client, self.context_client, duration, interval
        #     kpi_id, self.kpi_manager_client, self.context_client, resource, duration, interval
        # )
        resource_to_subscribe = get_mgon_subscription_parameters(duration, interval)     # TODO: Remove after confirming get_subscription_parameters is working correctly
        # TODO: Remove after confirming get_subscription_parameters generic is working correctly
        resource_to_subscribe = get_mgon_subscription_parameters(
                collector['resource'], collector['endpoint'], collector['kpi'],
                collector['duration'], collector['sample_interval']
                )

        if not resource_to_subscribe:
            LOGGER.warning(f"KPI ID: {kpi_id} - Resource to subscribe not found. Skipping...")
+0 −0

Empty file deleted.

+9 −8
Original line number Diff line number Diff line
@@ -36,6 +36,7 @@ devices = {
        #'resource': 'oc-wave-router:wavelength-router/fsmgon:optical-bands/optical-band[index=4]/state/optical-power-total-input/instant',
        'resource'   : 'wavelength-router',            #TODO: verify resource name form mg-on model
        'endpoint'   : '4',
        'skip_verify': True
    },
}

+1 −1
Original line number Diff line number Diff line
@@ -74,7 +74,7 @@ def test_Complete_MGON_Integration(kpi_manager_client, telemetry_frontend_client
            LOGGER.info("Response gRPC message object: {:}".format(response))
            assert isinstance(response, CollectorId)
    except Exception as e:
        LOGGER.info("No existing Collector found with ID: %s. Proceeding to create it.", _search_collector_id.collector_id.uuid)
        LOGGER.info("Error finding the collector with ID: %s. Proceeding to create it.", _search_collector_id.collector_id.uuid)
        response = telemetry_frontend_client.StartCollector(_collector_request)
        LOGGER.info("Response gRPC message object: {:}".format(response))
        assert isinstance(response, CollectorId)