# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import grpc import json import logging import threading from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.proto.kpi_value_api_pb2 import KpiValue from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId from common.proto.kpi_manager_pb2_grpc import KpiManagerServiceStub from confluent_kafka import KafkaError from confluent_kafka import Consumer as KafkaConsumer from kpi_manager.client.KpiManagerClient import KpiManagerClient from monitoring.service.NameMapping import NameMapping from kpi_manager.service.KpiManagerService import KpiManagerService LOGGER = logging.getLogger(__name__) ACTIVE_CONSUMERS = [] class KpiValueWriter: @staticmethod def RunKafkaConsumer(): thread = threading.Thread(target=KpiValueWriter.KafkaConsumer, args=()) ACTIVE_CONSUMERS.append(thread) thread.start() @staticmethod def KafkaConsumer(): kafka_consumer = KafkaConsumer( { 'bootstrap.servers' : KafkaConfig.SERVER_IP.value, 'group.id' : __class__, 'auto.offset.reset' : 'latest'} ) kafka_consumer.subscribe([KafkaTopic.VALUE.value]) LOGGER.debug("Kafka Consumer start listenng on topic: ".format(KafkaTopic.VALUE.value)) print("Kafka Consumer start listenng on topic: ".format(KafkaTopic.VALUE.value)) while True: raw_kpi = kafka_consumer.poll(1.0) if raw_kpi is None: continue elif raw_kpi.error(): if raw_kpi.error().code() == KafkaError._PARTITION_EOF: continue else: print("Consumer error: {}".format(raw_kpi.error())) continue try: kpi_value = KpiValue() kpi_value.ParseFromString(raw_kpi.value()) LOGGER.debug("Received KPI : {:}".format(kpi_value)) print("Received KPI : {:}".format(kpi_value)) KpiValueWriter.get_kpi_descriptor_from_db(kpi_value.kpi_id.kpi_id.uuid) except Exception as e: print("Error detail: {:}".format(e)) continue @staticmethod def get_kpi_descriptor_from_db(kpi_value_uuid: str): print("--- START -----") kpi_id = KpiId() kpi_id.kpi_id.uuid = kpi_value_uuid print("KpiId generated: {:}".format(kpi_id)) kpi_manager_client = KpiManagerClient() print("Kpi manger client created: {:}".format(kpi_manager_client)) try: kpi_descriptor_object = KpiDescriptor() kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) print("kpi descriptor received: {:}".format(kpi_descriptor_object)) if isinstance (kpi_descriptor_object, KpiDescriptor): LOGGER.debug("Extracted row: {:}".format(kpi_descriptor_object)) else: LOGGER.debug("Error in extracting row {:}".format(kpi_descriptor_object)) except Exception as e: print ("Unable to get Descriptor. Error: {:}".format(e))