diff --git a/src/kpi_value_writer/service/KpiValueWriter.py b/src/kpi_value_writer/service/KpiValueWriter.py index 5d1a98808fd5ed07a388a7f20b73bf67edd66135..eba6516747bfe3b8a8e983967d47ec21d26bc8a4 100644 --- a/src/kpi_value_writer/service/KpiValueWriter.py +++ b/src/kpi_value_writer/service/KpiValueWriter.py @@ -41,18 +41,18 @@ class KpiValueWriter(GenericGrpcService): @staticmethod def RunKafkaConsumer(): - thread = threading.Thread(target=KpiValueWriter.KafkaConsumer, args=()) + thread = threading.Thread(target=KpiValueWriter.KafkaKpiConsumer, args=()) ACTIVE_CONSUMERS.append(thread) thread.start() @staticmethod - def KafkaConsumer(): + def KafkaKpiConsumer(): kpi_manager_client = KpiManagerClient() metric_writer = MetricWriterToPrometheus() kafka_consumer = KafkaConsumer( { 'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : __class__, + 'group.id' : 'KpiValueWriter', 'auto.offset.reset' : 'latest'} ) kafka_consumer.subscribe([KafkaTopic.VALUE.value]) diff --git a/src/kpi_value_writer/service/MetricWriterToPrometheus.py b/src/kpi_value_writer/service/MetricWriterToPrometheus.py index 4e61062553405e3852b5ba32e2dd868ae0a1728b..a86b8f34e462a2703f5dfa4ece947613978bdef9 100644 --- a/src/kpi_value_writer/service/MetricWriterToPrometheus.py +++ b/src/kpi_value_writer/service/MetricWriterToPrometheus.py @@ -14,11 +14,11 @@ # read Kafka stream from Kafka topic -import ast -import time -import threading +# import ast +# import time +# import threading import logging -from prometheus_client import Gauge, CollectorRegistry +from prometheus_client import Gauge from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.kpi_value_api_pb2 import KpiValue diff --git a/src/telemetry/backend/requirements.in b/src/telemetry/backend/requirements.in index e6a559be714faa31196206dbbdc53788506369b5..1d22df11b4032c05ba851e9d64e5ca3786ecc461 100644 --- a/src/telemetry/backend/requirements.in +++ b/src/telemetry/backend/requirements.in @@ -13,3 +13,7 @@ # limitations under the License. confluent-kafka==2.3.* +psycopg2-binary==2.9.* +SQLAlchemy==1.4.* +sqlalchemy-cockroachdb==1.4.* +SQLAlchemy-Utils==0.38.* \ No newline at end of file diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 991298d370abf082d08880fea1c870c5192f37c9..bb9f0a3147f1d814a137d0b5338596fe8908d3d4 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -18,10 +18,12 @@ import random import logging import threading from typing import Any, Dict -from common.proto.context_pb2 import Empty +# from common.proto.context_pb2 import Empty from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import KafkaError +from common.Constants import ServiceNameEnum +from common.Settings import get_service_port_grpc from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.method_wrappers.Decorator import MetricsPool from common.tools.service.GenericGrpcService import GenericGrpcService @@ -38,8 +40,10 @@ class TelemetryBackendService(GenericGrpcService): Class listens for request on Kafka topic, fetches requested metrics from device. Produces metrics on both RESPONSE and VALUE kafka topics. """ - def __init__(self): + def __init__(self, cls_name : str = __name__) -> None: LOGGER.info('Init TelemetryBackendService') + port = get_service_port_grpc(ServiceNameEnum.TELEMETRYBACKEND) + super().__init__(port, cls_name=cls_name) self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'backend',