From c2b96cf3a2b7a535eedf65d292dc57c4770d6cb3 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Tue, 3 Mar 2026 10:49:11 +0100 Subject: [PATCH 1/2] feat: enhance telemetry component with multiple service IP exports and subscription handling --- .../run_tests_locally-telemetry-backend.sh | 37 ++++- scripts/run_tests_locally-telemetry-gnmi.sh | 17 ++- .../backend/service/HelperMethods.py | 143 +++++++++++++----- .../service/TelemetryBackendService.py | 47 +++++- .../backend/service/collectors/__init__.py | 13 ++ .../gnmi_oc/GnmiOpenConfigCollector.py | 35 +++-- .../backend/service/collectors/gnmi_oc/KPI.py | 15 +- src/telemetry/backend/tests/Fixtures.py | 9 +- .../backend/tests/gnmi_oc/messages.py | 17 +-- .../test_integration_GnmiOCcollector.py | 44 +++++- src/telemetry/backend/tests/test_backend.py | 53 ++++--- 11 files changed, 323 insertions(+), 107 deletions(-) diff --git a/scripts/run_tests_locally-telemetry-backend.sh b/scripts/run_tests_locally-telemetry-backend.sh index 485db89ca..6a6987020 100755 --- a/scripts/run_tests_locally-telemetry-backend.sh +++ b/scripts/run_tests_locally-telemetry-backend.sh @@ -19,11 +19,44 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src # RCFILE=$PROJECTDIR/coverage/.coveragerc -export KFK_SERVER_ADDRESS='127.0.0.1:9092' CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require" -RCFILE=$PROJECTDIR/coverage/.coveragerc +# RCFILE=$PROJECTDIR/coverage/.coveragerc + +export KFK_SERVER_ADDRESS='127.0.0.1:9094' + +IP_KPI=$(kubectl get all --all-namespaces | grep service/kpi-managerservice | awk '{print $4}') +export IP_KPI +echo "KPI Manager Service IP: ${IP_KPI}" + +IP_TELE=$(kubectl get all --all-namespaces | grep service/telemetryservice | awk '{print $4}') +export IP_TELE +echo "Telemetry Frontend Service IP: ${IP_TELE}" + +IP_CONTEXT=$(kubectl get all --all-namespaces | grep service/contextservice | awk '{print $4}') +export IP_CONTEXT +echo "Context Service IP: ${IP_CONTEXT}" + +# Start Kafka port-forward in background +kubectl port-forward -n kafka service/kafka-public 9094:9094 > /dev/null 2>&1 & +KAFKA_PF_PID=$! + +# Function to cleanup port-forward on exit +cleanup() { + # echo "Cleaning up Kafka port-forward (PID: ${KAFKA_PF_PID})..." + kill ${KAFKA_PF_PID} 2>/dev/null || true + wait ${KAFKA_PF_PID} 2>/dev/null || true + sleep 1 +} + +trap cleanup EXIT INT TERM +echo "Waiting for Kafka port-forward to be ready..." +sleep 1 +# Verify port-forward is working +if ! nc -z 127.0.0.1 9094 2>/dev/null; then + echo "WARNING: Kafka port-forward may not be ready yet" +fi python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \ telemetry/backend/tests/test_backend.py diff --git a/scripts/run_tests_locally-telemetry-gnmi.sh b/scripts/run_tests_locally-telemetry-gnmi.sh index a3a5f2b9d..af2031d65 100755 --- a/scripts/run_tests_locally-telemetry-gnmi.sh +++ b/scripts/run_tests_locally-telemetry-gnmi.sh @@ -19,9 +19,22 @@ RCFILE=$PROJECTDIR/coverage/.coveragerc export KFK_SERVER_ADDRESS='127.0.0.1:9094' + +IP_KPI=$(kubectl get all --all-namespaces | grep service/kpi-managerservice | awk '{print $4}') +export IP_KPI +echo "KPI Manager Service IP: ${IP_KPI}" + +IP_TELE=$(kubectl get all --all-namespaces | grep service/telemetryservice | awk '{print $4}') +export IP_TELE +echo "Telemetry Frontend Service IP: ${IP_TELE}" + +IP_CONTEXT=$(kubectl get all --all-namespaces | grep service/contextservice | awk '{print $4}') +export IP_CONTEXT +echo "Context Service IP: ${IP_CONTEXT}" + # This is unit test (should be tested with container-lab running) -python3 -m pytest --log-level=info --log-cli-level=info --verbose \ - telemetry/backend/tests/gnmi_oc/test_unit_GnmiOpenConfigCollector.py +# python3 -m pytest --log-level=info --log-cli-level=info --verbose \ +# telemetry/backend/tests/gnmi_oc/test_unit_GnmiOpenConfigCollector.py::test_full_workflow # This is integration test (should be tested with container-lab running) python3 -m pytest --log-level=info --log-cli-level=info --verbose \ diff --git a/src/telemetry/backend/service/HelperMethods.py b/src/telemetry/backend/service/HelperMethods.py index 2d57917c1..8c8a988ee 100644 --- a/src/telemetry/backend/service/HelperMethods.py +++ b/src/telemetry/backend/service/HelperMethods.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import uuid import logging from .collector_api._Collector import _Collector @@ -20,48 +21,72 @@ from .collectors.int_collector.INTCollector import INTCollector from common.proto.kpi_manager_pb2 import KpiId from common.tools.context_queries.Device import get_device from common.tools.context_queries.EndPoint import get_endpoint_names +from common.tools.context_queries.Service import get_service_by_uuid + from typing import List, Tuple, Optional +from .collectors.gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector LOGGER = logging.getLogger(__name__) -def get_subscription_parameters( - kpi_id : str, kpi_manager_client, context_client, duration, interval - ) -> Optional[List[Tuple]]: +def get_optical_subscription( + kpi_id: str, kpi_descriptor, context_client, duration: float, interval: float, resource: str, +) -> Optional[List[Tuple]]: """ - Method to get subscription parameters based on KPI ID. - Returns a list of tuples with subscription parameters. - Each tuple contains: - - Subscription ID (str) - - Dictionary with: - - "kpi" (str): KPI ID - - "endpoint" (str): Endpoint name (e.g., 'eth0') - - "resource" (str): Resource type (e.g., 'interface') - - Sample interval (float) - - Report interval (float) - If the KPI ID is not found or the device is not available, returns None. - Preconditions: - - A KPI Descriptor must be added in KPI DB with correct device_id. - - The device must be available in the context. + Get subscription parameters for optical devices (optical-roadm, optical-transponder) + using service uuid from KPI descriptor to find the endpoint in service config rules. + Returns: + List of subscription tuples or None if unable to create subscription """ - kpi_id_obj = KpiId() - kpi_id_obj.kpi_id.uuid = kpi_id # pyright: ignore[reportAttributeAccessIssue] - kpi_descriptor = kpi_manager_client.GetKpiDescriptor(kpi_id_obj) - if not kpi_descriptor: - LOGGER.warning(f"KPI ID: {kpi_id} - Descriptor not found. Skipping...") + _service_id = kpi_descriptor.service_id.service_uuid.uuid + LOGGER.info(f"KPI Descriptor (Service ID): {_service_id}") + + service = get_service_by_uuid(context_client, _service_id) + if not service: + LOGGER.warning(f"KPI ID: {kpi_id} - Service not found for Service ID: {_service_id}. Skipping...") + return None + + LOGGER.debug(f"Service for KPI ID: {kpi_id} - {service.name} - {service.service_config}") + if not service.service_config.config_rules: + LOGGER.warning(f"KPI ID: {kpi_id} - No config rules in service config. Skipping...") return None - kpi_sample_type = kpi_descriptor.kpi_sample_type - LOGGER.info(f"KPI Descriptor (KPI Sample Type): {kpi_sample_type}") + # Get the first config rule's custom resource_value + config_rule = service.service_config.config_rules[0] + if not config_rule.HasField('custom'): + LOGGER.warning(f"KPI ID: {kpi_id} - No custom config in service config. Skipping...") + return None - device = get_device( context_client = context_client, - device_uuid = kpi_descriptor.device_id.device_uuid.uuid, - include_config_rules = False, - include_components = False - ) - if not device: - raise Exception(f"KPI ID: {kpi_id} - Device not found for KPI descriptor.") - endpoints = device.device_endpoints + resource_value = json.loads(config_rule.custom.resource_value) + + if 'ob_id' not in resource_value: + LOGGER.warning(f"KPI ID: {kpi_id} - Resource ob_id not found in service config. Skipping...") + return None + + endpoint = resource_value['ob_id'] + return [ + ( + str(uuid.uuid4()), + { + "kpi" : kpi_descriptor.kpi_sample_type, + "endpoint" : endpoint, + "resource" : resource, + }, + float(duration), + float(interval), + ) + ] + +def get_ip_subscriptions( + kpi_id: str, kpi_descriptor, device, context_client, duration: float, interval: float, resource: str, +) -> Optional[List[Tuple]]: + """ + Get subscription parameters for IP/packet devices (routers, switches, etc.) + using device endpoints from context to create subscriptions for each endpoint. + Returns: + List of subscription tuples (one per endpoint) or None if unable to create subscriptions + """ + endpoints = device.device_endpoints LOGGER.debug(f"Device for KPI ID: {kpi_id} - {endpoints}") endpointsIds = [endpoint_id.endpoint_id for endpoint_id in endpoints] for endpoint_id in endpoints: @@ -76,17 +101,16 @@ def get_subscription_parameters( LOGGER.debug(f"Endpoint data: {endpoint_data}") subscriptions = [] - sub_id = None for endpoint in endpointsIds: - sub_id = str(uuid.uuid4()) # Generate a unique subscription ID + sub_id = str(uuid.uuid4()) LOGGER.info(f"Endpoint names only: {endpoint_data[endpoint.endpoint_uuid.uuid][0]}") subscriptions.append( ( - sub_id, # Example subscription ID + sub_id, { - "kpi" : kpi_sample_type, # As request is based on the single KPI so it should have only one endpoint - "endpoint" : endpoint_data[endpoint.endpoint_uuid.uuid][0], # Endpoint name - "resource" : 'interface', # Example resource type + "kpi" : kpi_descriptor.kpi_sample_type, + "endpoint" : endpoint_data[endpoint.endpoint_uuid.uuid][0], + "resource" : resource, }, float(duration), float(interval), @@ -94,6 +118,47 @@ def get_subscription_parameters( ) return subscriptions + +def get_subscription_parameters( + kpi_id : str, kpi_manager_client, context_client, duration, interval, resource: str = "", + ) -> Optional[List[Tuple]]: + """ + Method to get subscription parameters based on KPI ID. + Returns a list of tuples with subscription parameters. + Each tuple contains: + - Subscription ID (str) + - Dictionary with: + - "kpi" (str): KPI ID + - "endpoint" (str): Endpoint name (e.g., 'eth0') + - "resource" (str): Resource type (e.g., 'interface') + - Sample interval (float) + - Report interval (float) + If the KPI ID is not found or the device is not available, returns None. + Preconditions: + - A KPI Descriptor must be added in KPI DB with correct device_id. + - The device must be available in the context. + """ + kpi_id_obj = KpiId() + kpi_id_obj.kpi_id.uuid = kpi_id + kpi_descriptor = kpi_manager_client.GetKpiDescriptor(kpi_id_obj) + if not kpi_descriptor: + LOGGER.warning(f"KPI ID: {kpi_id} - Descriptor not found. Skipping...") + return None + + device = get_device(context_client = context_client, + device_uuid = kpi_descriptor.device_id.device_uuid.uuid, + include_config_rules = False, + include_components = False + ) + if not device: + raise Exception(f"KPI ID: {kpi_id} - Device not found for KPI descriptor.") + + # Route to appropriate subscription handler based on device type + if device.device_type in ['optical-roadm', 'optical-transponder']: + return get_optical_subscription(kpi_id, kpi_descriptor, context_client, duration, interval, resource="wavelength-router") + else: + return get_ip_subscriptions(kpi_id, kpi_descriptor, device, context_client, duration, interval, resource="interface") + def get_collector_by_kpi_id(kpi_id: str, kpi_manager_client, context_client, driver_instance_cache ) -> Optional[_Collector]: """ @@ -109,7 +174,7 @@ def get_collector_by_kpi_id(kpi_id: str, kpi_manager_client, context_client, dri LOGGER.info(f"Getting collector for KPI ID: {kpi_id}") kpi_id_obj = KpiId() kpi_id_obj.kpi_id.uuid = kpi_id # pyright: ignore[reportAttributeAccessIssue] - kpi_descriptor = kpi_manager_client.GetKpiDescriptor(kpi_id_obj) + kpi_descriptor = kpi_manager_client.GetKpiDescriptor(kpi_id_obj) if not kpi_descriptor: raise Exception(f"KPI ID: {kpi_id} - Descriptor not found.") diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index d4e99f300..5c01d77ef 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -52,8 +52,8 @@ class TelemetryBackendService(GenericGrpcService): 'auto.offset.reset' : 'latest'}) self.driver_instance_cache = driver_instance_cache self.device_collector = None - self.context_client = ContextClient() - self.kpi_manager_client = KpiManagerClient() + self.context_client = ContextClient(host="10.152.183.28", port=1010) + self.kpi_manager_client = KpiManagerClient(host="10.152.183.25", port=30010) self.active_jobs = {} def install_servicers(self): @@ -144,6 +144,15 @@ class TelemetryBackendService(GenericGrpcService): ) return # Rest of the collectors + # elif context_id == "43813baf-195e-5da6-af20-b3d0922e71a7": + # self.device_collector = get_mgon_collector( + # address = collector['host'], # "172.17.254.24", + # port = collector['port'], # 50061, + # username = collector['username'], # "admin", + # password = collector['password'], # "admin", + # insecure = collector.get('insecure', True), + # skip_verify = collector.get('skip_verify', True) + # ) else: self.device_collector = get_collector_by_kpi_id( kpi_id, self.kpi_manager_client, self.context_client, self.driver_instance_cache) @@ -156,6 +165,12 @@ class TelemetryBackendService(GenericGrpcService): resource_to_subscribe = get_subscription_parameters( kpi_id, self.kpi_manager_client, self.context_client, duration, interval ) + # TODO: Remove after confirming get_subscription_parameters generic is working correctly + # resource_to_subscribe = get_mgon_subscription_parameters( + # collector['resource'], collector['endpoint'], collector['kpi'], + # collector['duration'], collector['sample_interval'] + # ) + if not resource_to_subscribe: LOGGER.warning(f"KPI ID: {kpi_id} - Resource to subscribe not found. Skipping...") raise Exception(f"KPI ID: {kpi_id} - Resource to subscribe not found.") @@ -170,11 +185,29 @@ class TelemetryBackendService(GenericGrpcService): for samples in self.device_collector.GetState(duration=duration, blocking=True): LOGGER.info(f"KPI ID: {kpi_id} - Samples: {samples}") - self.GenerateKpiValue(collector_id, kpi_id, samples) - - # TODO: Stop_event should be managed in this method because there will be no more specific collector - if stop_event.is_set(): - self.device_collector.Disconnect() + if isinstance(samples, dict): + self.GenerateKpiValue(collector_id, kpi_id, sample_value) + # inn_dict = samples.get('update', {}) + # LOGGER.info(f"KPI ID: {kpi_id} - Inner Dictionary: {inn_dict}") + # list_update = inn_dict.get('update', []) + # LOGGER.info(f"KPI ID: {kpi_id} - List Update: {list_update}") + # if len(list_update) > 0: + # sample_value = list_update[0].get('val') + # self.GenerateKpiValue(collector_id, kpi_id, sample_value) + ''' +{ + 'update': { + 'timestamp': 1772103489806507669, + 'update': + [ + {'path': 'openconfig-wavelength-router:wavelength-router/flex-scale-mg-on:optical-bands/optical-band[index=2]/state/optical-power-total-input/instant', 'val': -2.51} + ] + } + } + ''' + # TODO: Stop_event should be managed in this method because there will be no more specific collector + if stop_event.is_set(): + self.device_collector.Disconnect() def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ diff --git a/src/telemetry/backend/service/collectors/__init__.py b/src/telemetry/backend/service/collectors/__init__.py index 6b45dcd41..8e06fdeee 100644 --- a/src/telemetry/backend/service/collectors/__init__.py +++ b/src/telemetry/backend/service/collectors/__init__.py @@ -56,3 +56,16 @@ if LOAD_ALL_DEVICE_DRIVERS: } ]) ) + +if LOAD_ALL_DEVICE_DRIVERS: + from .gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector # pylint: disable=wrong-import-position + COLLECTORS.append( + (GNMIOpenConfigCollector, [ + { + FilterFieldEnum.DEVICE_TYPE: [ + DeviceTypeEnum.OPTICAL_ROADM, + DeviceTypeEnum.OPTICAL_TRANSPONDER + ], + FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_OC, + } + ])) diff --git a/src/telemetry/backend/service/collectors/gnmi_oc/GnmiOpenConfigCollector.py b/src/telemetry/backend/service/collectors/gnmi_oc/GnmiOpenConfigCollector.py index 31cdce39a..e51f3ac35 100644 --- a/src/telemetry/backend/service/collectors/gnmi_oc/GnmiOpenConfigCollector.py +++ b/src/telemetry/backend/service/collectors/gnmi_oc/GnmiOpenConfigCollector.py @@ -65,7 +65,10 @@ class GNMIOpenConfigCollector(_Collector): insecure=self.insecure ) # self.logger.info("Connecting to gNMI target %s:%s with %s and %s", self.address, self.port, self.username, self.password) - self.client.connect() # type: ignore + if self.client is None: + self.logger.error("Failed to create gNMI client for target %s:%s", self.address, self.port) + return False + self.client.connect() # TO be enable in test. type: ignore self.connected = True self.logger.info("Connected to gNMI target %s:%s", self.address, self.port) return True @@ -110,20 +113,22 @@ class GNMIOpenConfigCollector(_Collector): kpi=sub_endpoint['kpi'], resource=sub_endpoint['resource'], ) - - self._subscriptions[sub_id] = Subscription( - sub_id = sub_id, - gnmi_client = self.client, # type: ignore - path_list = paths, # <- list of paths - metric_queue = self._output_queue, - mode = 'stream', # Default mode - sample_interval_ns = int(interval * 1_000_000_000), # Convert seconds to nanoseconds - heartbeat_interval_ns = int(duration * 1_000_000_000), # Convert seconds to nanoseconds - encoding = 'json_ietf', # Default encoding - ) - - self.logger.info("Subscribing to %s with job_id %s ...", sub_endpoint, sub_id) - response.append(True) + self.logger.debug("Built %d candidate path(s) for endpoint '%s'", len(paths), paths) + if self.connected and self.client: + self._subscriptions[sub_id] = Subscription( + sub_id = sub_id, + gnmi_client = self.client, # type: ignore + path_list = paths, # <- list of paths + metric_queue = self._output_queue, + mode = 'sample', # Entry mode: sample/on_change/target_defined + sample_interval_ns = int(interval * 1_000_000_000), # Convert seconds to nanoseconds + total_duration = duration, + encoding = 'json', # Use 'json' encoding (not 'json_ietf') + ) + self.logger.info("Subscribing to %s with job_id %s ...", sub_endpoint, sub_id) + response.append(True) + else: + raise ConnectionError("Not connected to gNMI target.") except: self.logger.exception("Invalid subscription format: %s", subscription) response.append(False) diff --git a/src/telemetry/backend/service/collectors/gnmi_oc/KPI.py b/src/telemetry/backend/service/collectors/gnmi_oc/KPI.py index 7281c8a2e..c00e07eb3 100644 --- a/src/telemetry/backend/service/collectors/gnmi_oc/KPI.py +++ b/src/telemetry/backend/service/collectors/gnmi_oc/KPI.py @@ -18,11 +18,12 @@ from enum import IntEnum, unique @unique class KPI(IntEnum): # TODO: verify KPI names and codes with KPI proto file. (How many TFS supports) """Generic KPI codes that map to interface statistics.""" - PACKETS_TRANSMITTED = 101 - PACKETS_RECEIVED = 102 - PACKETS_DROPPED = 103 - BYTES_TRANSMITTED = 201 - BYTES_RECEIVED = 202 - BYTES_DROPPED = 203 - INBAND_POWER = 301 + KPISAMPLETYPE_PACKETS_TRANSMITTED = 101 + KPISAMPLETYPE_PACKETS_RECEIVED = 102 + KPISAMPLETYPE_PACKETS_DROPPED = 103 + KPISAMPLETYPE_BYTES_TRANSMITTED = 201 + KPISAMPLETYPE_BYTES_RECEIVED = 202 + KPISAMPLETYPE_BYTES_DROPPED = 203 + KPISAMPLETYPE_INBAND_POWER = 301 + KPISAMPLETYPE_OPTICAL_TOTAL_INPUT_POWER = 503 # TODO: Add more KPIs as needed, diff --git a/src/telemetry/backend/tests/Fixtures.py b/src/telemetry/backend/tests/Fixtures.py index 4fd225136..066369e9b 100644 --- a/src/telemetry/backend/tests/Fixtures.py +++ b/src/telemetry/backend/tests/Fixtures.py @@ -14,12 +14,17 @@ import pytest import logging +import os from context.client.ContextClient import ContextClient from device.client.DeviceClient import DeviceClient from service.client.ServiceClient import ServiceClient from kpi_manager.client.KpiManagerClient import KpiManagerClient +# Import ENV variables +_ip_kpi_address = os.getenv('IP_KPI', None) +_ip_tele_address = os.getenv('IP_TELE', None) +_ip_context_address = os.getenv('IP_CONTEXT', None) LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) @@ -27,7 +32,7 @@ LOGGER.setLevel(logging.DEBUG) @pytest.fixture(scope='session') def context_client(): - _client = ContextClient(host="10.152.183.180") + _client = ContextClient(host=_ip_context_address) _client.connect() LOGGER.info('Yielding Connected ContextClient...') yield _client @@ -54,7 +59,7 @@ def service_client(): @pytest.fixture(scope='session') def kpi_manager_client(): - _client = KpiManagerClient(host="10.152.183.108") + _client = KpiManagerClient(host=_ip_kpi_address) _client.connect() LOGGER.info('Yielding Connected KpiManagerClient...') yield _client diff --git a/src/telemetry/backend/tests/gnmi_oc/messages.py b/src/telemetry/backend/tests/gnmi_oc/messages.py index d68d2dde3..2f6823e35 100644 --- a/src/telemetry/backend/tests/gnmi_oc/messages.py +++ b/src/telemetry/backend/tests/gnmi_oc/messages.py @@ -65,15 +65,14 @@ def creat_basic_sub_request_parameters( def create_kpi_descriptor_request(descriptor_name: str = "Test_name"): _create_kpi_request = kpi_manager_pb2.KpiDescriptor() - # _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_kpi_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf447777" _create_kpi_request.kpi_description = descriptor_name - _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED - # _create_kpi_request.device_id.device_uuid.uuid = str(uuid.uuid4()) - _create_kpi_request.device_id.device_uuid.uuid = "a8695f53-ba2e-57bd-b586-edf2b5e054b1" - _create_kpi_request.service_id.service_uuid.uuid = 'SERV2' - _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1' - _create_kpi_request.endpoint_id.endpoint_uuid.uuid = str(uuid.uuid4()) - _create_kpi_request.connection_id.connection_uuid.uuid = 'CON1' - _create_kpi_request.link_id.link_uuid.uuid = 'LNK1' + _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_OPTICAL_TOTAL_INPUT_POWER + _create_kpi_request.device_id.device_uuid.uuid = "ddb3ef8e-ee65-5cf9-9d21-dac56a27f85b" # confirm for TFS + _create_kpi_request.service_id.service_uuid.uuid = "fd7a7b7d-4cd1-5453-bf79-47c7b53c31da" + # _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1' + # _create_kpi_request.endpoint_id.endpoint_uuid.uuid = "END1" + # _create_kpi_request.connection_id.connection_uuid.uuid = 'CON1' + # _create_kpi_request.link_id.link_uuid.uuid = 'LNK1' return _create_kpi_request + diff --git a/src/telemetry/backend/tests/gnmi_oc/test_integration_GnmiOCcollector.py b/src/telemetry/backend/tests/gnmi_oc/test_integration_GnmiOCcollector.py index 3efb84621..0a05228ac 100644 --- a/src/telemetry/backend/tests/gnmi_oc/test_integration_GnmiOCcollector.py +++ b/src/telemetry/backend/tests/gnmi_oc/test_integration_GnmiOCcollector.py @@ -24,8 +24,9 @@ from common.proto.context_pb2 import TopologyId, ContextId, Empty from common.proto.kpi_manager_pb2 import KpiId from common.tools.context_queries.Topology import get_topology from common.tools.kafka.Variables import KafkaTopic +from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor -from telemetry.backend.service.HelperMethods import get_collector_by_kpi_id +from telemetry.backend.service.HelperMethods import get_collector_by_kpi_id, get_subscription_parameters from telemetry.backend.service.collector_api.DriverFactory import DriverFactory from telemetry.backend.service.collector_api.DriverInstanceCache import DriverInstanceCache, preload_drivers from telemetry.backend.service.collectors import COLLECTORS @@ -171,14 +172,47 @@ def test_helper_get_collector_by_kpi_id(kpi_manager_client, context_client): driver_factory = DriverFactory(COLLECTORS) driver_instance_cache = DriverInstanceCache(driver_factory) - - kpi_id = "6e22f180-ba28-4641-b190-2287bf447777" + + _kpi_descriptor = create_kpi_descriptor_request() + kpi_id = _kpi_descriptor.kpi_id + + try: + response = kpi_manager_client.GetKpiDescriptor(kpi_id) + if isinstance(response, KpiDescriptor): + LOGGER.info("KPI Descriptor already exists with ID: %s. Deleting it for clean testing.", kpi_id.kpi_id.uuid) + kpi_manager_client.DeleteKpiDescriptor(kpi_id) + except Exception as e: + LOGGER.info("No existing KPI Descriptor found with ID: %s. Proceeding to create it. Error: %s", kpi_id.kpi_id.uuid, str(e)) + + # _kpi_descriptor = create_kpi_descriptor_request() + response = kpi_manager_client.SetKpiDescriptor(_kpi_descriptor) + LOGGER.info("KPI Descriptor created with ID: %s", response.kpi_id.uuid) + + + kpi_uuid = kpi_id.kpi_id.uuid collector = get_collector_by_kpi_id( - kpi_id, + kpi_uuid, kpi_manager_client, context_client, driver_instance_cache ) assert collector is not None assert isinstance(collector, GNMIOpenConfigCollector) - LOGGER.info(f"Collector for KPI ID {kpi_id} found: {collector.__class__.__name__}") + LOGGER.info(f"Collector for KPI ID {kpi_uuid} found: {collector.__class__.__name__}") + + LOGGER.info("Testing get_subscription_parameters...") + resource = 'wavelength-router' + duration = 10.0 + interval = 5.0 + subscriptions = get_subscription_parameters( + kpi_uuid, + kpi_manager_client, + context_client, + duration, + interval, + resource, + ) + assert subscriptions is not None + assert len(subscriptions) > 0 + LOGGER.info(f"Subscription parameters for KPI ID {kpi_uuid}: {subscriptions}") + diff --git a/src/telemetry/backend/tests/test_backend.py b/src/telemetry/backend/tests/test_backend.py index 350ffac80..68fbd6a1a 100644 --- a/src/telemetry/backend/tests/test_backend.py +++ b/src/telemetry/backend/tests/test_backend.py @@ -55,7 +55,7 @@ def publish_request_on_kafka(collector_obj, collector_uuid): value = json.dumps(collector_obj), callback = delivery_callback ) - LOGGER.info("Collector Request Generated: Collector Id: {:}, Value: {:}".format(collector_uuid, collector_obj)) + LOGGER.info("Message sent to Kafka: Collector Id: {:}, Value: {:}".format(collector_uuid, collector_obj)) kafka_producer.flush() def delivery_callback(err, msg): @@ -108,24 +108,39 @@ def telemetry_backend_service(): # 1. Start telemetry backend service # 2. Generate request on Kafka -# def test_collector_lifecycle( -# telemetry_backend_service, kpi_manager_client, context_client -# ): -# time.sleep(5) # Wait for the service to start -# LOGGER.info('------ Waiting time for telemetry backend service is finished ------') -# # Create a collector request -# collector_obj : Dict = { -# "kpi_id": "6e22f180-ba28-4641-b190-2287bf447777", -# "duration": 10.0, -# "interval": 3.0 -# } -# collector_id = "6e22f180-ba28-4641-b190-2287bf444444" -# publish_request_on_kafka(collector_obj, collector_id) -# # Wait for the collector to be created and started -# LOGGER.info('Waiting for collector to be created and started...') -# time.sleep(30) # Adjust the sleep time as needed for your environment - -# LOGGER.info('Test terminated.') +def test_collector_lifecycle( + telemetry_backend_service, kpi_manager_client, context_client + ): + LOGGER.info('waiting 10 seconds for telemetry backend service to start...') + time.sleep(10) # Wait for the service to start + # LOGGER.info('------ Waiting time for telemetry backend service is finished ------') + # Create a collector request + collector_obj : Dict = { + "kpi_id" : "6e22f180-ba28-4641-b190-2287bf447777", + "interface" : "", + "transport_port" : "", + "service_id" : "", + "context_id" : "", + "duration" : 30.0, + "interval" : 7.0 + } + collector_id = "6e22f180-ba28-4641-b190-2287bf444444" + + publish_request_on_kafka(collector_obj, collector_id) + + # Wait for the collector to be created and started + LOGGER.info('Waiting for collector to be created and started...') + time.sleep(35) # Adjust the sleep time as needed for your environment + + stop_collector_obj : Dict = { + "kpi_id": "6e22f180-ba28-4641-b190-2287bf447777", + "duration": -1.0, + "interval": -1.0, + } + LOGGER.info('Publishing stop request for collector...') + publish_request_on_kafka(stop_collector_obj, collector_id) + + LOGGER.info('Test terminated.') # The following conditions to completly test this method (not for Gitlab CI/CD) # 1. A KPI Descriptor must be added in KPI DB with correct device_id. (gNMI OpenConfig driver) -- GitLab From 2b0d9d77a1b1917efd719fb2e025484b42282584 Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Thu, 2 Apr 2026 12:22:04 +0000 Subject: [PATCH 2/2] pre-merge cleanup --- src/telemetry/backend/service/TelemetryBackendService.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 33429e194..dd2965ce5 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -52,8 +52,8 @@ class TelemetryBackendService(GenericGrpcService): 'auto.offset.reset' : 'latest'}) self.driver_instance_cache = driver_instance_cache self.device_collector = None - self.context_client = ContextClient(host="10.152.183.28", port=1010) - self.kpi_manager_client = KpiManagerClient(host="10.152.183.25", port=30010) + self.context_client = ContextClient() + self.kpi_manager_client = KpiManagerClient() self.active_jobs = {} def install_servicers(self): -- GitLab