Commit 7353c445 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Some minor updates in Telemetry main file to gNMI-OC changes

- updated imports
- Added logger condition to remove warning messages
- change DRIVERS with COLLECTORS
parent 35c0ee04
Loading
Loading
Loading
Loading
+48 −0
Original line number Diff line number Diff line
# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from .collector_api._Collector               import _Collector
from .collector_api.DriverInstanceCache      import get_driver
from common.proto.kpi_manager_pb2            import KpiId
from common.tools.context_queries.Device     import get_device

LOGGER = logging.getLogger(__name__)

def get_collector_by_kpi_id(kpi_id: str, kpi_manager_client, context_client, driver_instance_cache
                            ) -> _Collector | None:
    """
    Method to get device_type and endpoint detail based on kpi_id.
    """
    LOGGER.info(f"Getting collector for KPI ID: {kpi_id}")
    kpi_id_obj = KpiId()
    kpi_id_obj.kpi_id.uuid = kpi_id              # pyright: ignore[reportAttributeAccessIssue]
    kpi_descriptor     = kpi_manager_client.GetKpiDescriptor(kpi_id_obj)
    LOGGER.info(f"KPI Descriptor: {kpi_descriptor}")
    if not kpi_descriptor:
        raise Exception(f"KPI ID: {kpi_id} - Descriptor not found.")
    
    # device_uuid       = kpi_descriptor.device_id.device_uuid.uuid
    device = get_device( context_client       = context_client,
                         device_uuid          = kpi_descriptor.device_id.device_uuid.uuid,
                         include_config_rules = True,
                         include_components   = False,
                         )

    # Getting device collector (testing)
    collector : _Collector = get_driver(driver_instance_cache, device)
    if collector is None:
        raise Exception(f"KPI ID: {kpi_id} - Collector not found for device {device.device_uuid.uuid}.")
    LOGGER.info(f"Collector for KPI ID: {kpi_id} - {collector.__class__.__name__}")
    return collector
+17 −14
Original line number Diff line number Diff line
@@ -16,25 +16,27 @@ import json
import time
import logging
import threading
from typing           import Any, Dict, Tuple
from datetime         import datetime, timezone
from confluent_kafka  import Producer as KafkaProducer
from confluent_kafka  import Consumer as KafkaConsumer
from confluent_kafka  import KafkaError

from .HelperMethods   import get_collector_by_kpi_id
from common.Constants import ServiceNameEnum
from common.Settings  import get_service_port_grpc
from common.method_wrappers.Decorator        import MetricsPool
from common.tools.kafka.Variables            import KafkaConfig, KafkaTopic
from common.tools.service.GenericGrpcService import GenericGrpcService
from common.tools.context_queries.Device     import get_device
from common.proto.kpi_manager_pb2            import KpiId
from confluent_kafka  import Consumer as KafkaConsumer
from confluent_kafka  import KafkaError
from confluent_kafka  import Producer as KafkaProducer
from datetime         import datetime, timezone
from typing           import Any, Dict

from .collector_api._Collector               import _Collector
from .collector_api.DriverInstanceCache      import DriverInstanceCache, get_driver
from kpi_manager.client.KpiManagerClient     import KpiManagerClient
from .collectors.emulated.EmulatedCollector  import EmulatedCollector
from common.method_wrappers.Decorator        import MetricsPool
from common.proto.kpi_manager_pb2            import KpiId
from common.tools.context_queries.Device     import get_device
from common.tools.kafka.Variables            import KafkaConfig, KafkaTopic
from common.tools.service.GenericGrpcService import GenericGrpcService
from context.client.ContextClient            import ContextClient
from telemetry.backend.service.collectors.emulated.EmulatedCollector import EmulatedCollector
from kpi_manager.client.KpiManagerClient     import KpiManagerClient


LOGGER       = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService')
@@ -53,7 +55,8 @@ class TelemetryBackendService(GenericGrpcService):
                                            'group.id'           : 'backend',
                                            'auto.offset.reset'  : 'latest'})
        self.driver_instance_cache = driver_instance_cache
        self.collector             = None
        self.device_collector      = None
        self.collector             = None           # This should be replaced with device_collector (later to be removed)
        self.context_client        = ContextClient()
        self.kpi_manager_client    = KpiManagerClient()
        self.active_jobs = {}
@@ -97,7 +100,7 @@ class TelemetryBackendService(GenericGrpcService):
                    if collector_id not in self.active_jobs:
                        stop_event = threading.Event()
                        self.active_jobs[collector_id] = stop_event
                        threading.Thread(target = self.CollectorHandler, 
                        threading.Thread(target = self.GenericCollectorHandler, 
                                    args=(
                                        collector_id, 
                                        collector['kpi_id'],
+6 −4
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@
# limitations under the License.

import logging, signal, sys, threading
from typing import Optional
from prometheus_client.exposition import start_http_server
from common.Settings import get_log_level, get_metrics_port
from common.Constants import ServiceNameEnum
@@ -21,12 +22,13 @@ from .TelemetryBackendService import TelemetryBackendService

from .collector_api.DriverFactory import DriverFactory
from .collector_api.DriverInstanceCache import DriverInstanceCache, preload_drivers
from .collectors import DRIVERS
from .collectors import COLLECTORS

terminate = threading.Event()
LOGGER : logging.Logger = None
LOGGER : Optional[logging.Logger] = None

def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
    if LOGGER:
       LOGGER.warning('Terminate signal received')
    terminate.set()

@@ -49,7 +51,7 @@ def main():
    start_http_server(metrics_port)

    # Initialize Driver framework
    driver_factory = DriverFactory(DRIVERS)
    driver_factory = DriverFactory(COLLECTORS)
    driver_instance_cache = DriverInstanceCache(driver_factory)

    grpc_service = TelemetryBackendService(driver_instance_cache)