From b907871f4ab0f0cbfefdb23b8b323cecb3f5d325 Mon Sep 17 00:00:00 2001 From: Waleed Akbar <wakbar@cttc.es> Date: Sat, 10 Aug 2024 07:11:35 +0000 Subject: [PATCH] Changes on KPI Value Writer and Telemetry Backend - Renamed the method to "KafkaKpiConsumer" to avoid conflict with the "KafkaConsumer" import in KpiApiWriter. - Removed unnecessary imports in KpiWriterToProm. - Added `get_service_port_grpc` call and imports in the Telemetry backend service. - Added new libraries to `requirements.in` for Telemetry. --- src/kpi_value_writer/service/KpiValueWriter.py | 6 +++--- src/kpi_value_writer/service/MetricWriterToPrometheus.py | 8 ++++---- src/telemetry/backend/requirements.in | 4 ++++ src/telemetry/backend/service/TelemetryBackendService.py | 8 ++++++-- 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/kpi_value_writer/service/KpiValueWriter.py b/src/kpi_value_writer/service/KpiValueWriter.py index 5d1a98808..eba651674 100644 --- a/src/kpi_value_writer/service/KpiValueWriter.py +++ b/src/kpi_value_writer/service/KpiValueWriter.py @@ -41,18 +41,18 @@ class KpiValueWriter(GenericGrpcService): @staticmethod def RunKafkaConsumer(): - thread = threading.Thread(target=KpiValueWriter.KafkaConsumer, args=()) + thread = threading.Thread(target=KpiValueWriter.KafkaKpiConsumer, args=()) ACTIVE_CONSUMERS.append(thread) thread.start() @staticmethod - def KafkaConsumer(): + def KafkaKpiConsumer(): kpi_manager_client = KpiManagerClient() metric_writer = MetricWriterToPrometheus() kafka_consumer = KafkaConsumer( { 'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : __class__, + 'group.id' : 'KpiValueWriter', 'auto.offset.reset' : 'latest'} ) kafka_consumer.subscribe([KafkaTopic.VALUE.value]) diff --git a/src/kpi_value_writer/service/MetricWriterToPrometheus.py b/src/kpi_value_writer/service/MetricWriterToPrometheus.py index 4e6106255..a86b8f34e 100644 --- a/src/kpi_value_writer/service/MetricWriterToPrometheus.py +++ b/src/kpi_value_writer/service/MetricWriterToPrometheus.py @@ -14,11 +14,11 @@ # read Kafka stream from Kafka topic -import ast -import time -import threading +# import ast +# import time +# import threading import logging -from prometheus_client import Gauge, CollectorRegistry +from prometheus_client import Gauge from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.kpi_value_api_pb2 import KpiValue diff --git a/src/telemetry/backend/requirements.in b/src/telemetry/backend/requirements.in index e6a559be7..1d22df11b 100644 --- a/src/telemetry/backend/requirements.in +++ b/src/telemetry/backend/requirements.in @@ -13,3 +13,7 @@ # limitations under the License. confluent-kafka==2.3.* +psycopg2-binary==2.9.* +SQLAlchemy==1.4.* +sqlalchemy-cockroachdb==1.4.* +SQLAlchemy-Utils==0.38.* \ No newline at end of file diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 991298d37..bb9f0a314 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -18,10 +18,12 @@ import random import logging import threading from typing import Any, Dict -from common.proto.context_pb2 import Empty +# from common.proto.context_pb2 import Empty from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import KafkaError +from common.Constants import ServiceNameEnum +from common.Settings import get_service_port_grpc from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.method_wrappers.Decorator import MetricsPool from common.tools.service.GenericGrpcService import GenericGrpcService @@ -38,8 +40,10 @@ class TelemetryBackendService(GenericGrpcService): Class listens for request on Kafka topic, fetches requested metrics from device. Produces metrics on both RESPONSE and VALUE kafka topics. """ - def __init__(self): + def __init__(self, cls_name : str = __name__) -> None: LOGGER.info('Init TelemetryBackendService') + port = get_service_port_grpc(ServiceNameEnum.TELEMETRYBACKEND) + super().__init__(port, cls_name=cls_name) self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'backend', -- GitLab