Skip to content
Snippets Groups Projects
Commit 25f71f10 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes on KPI Value Writer and Telemetry Backend

- Renamed the method to "KafkaKpiConsumer" to avoid conflict with the "KafkaConsumer" import in KpiApiWriter.
- Removed unnecessary imports in KpiWriterToProm.
- Added `get_service_port_grpc` call and imports in the Telemetry backend service.
- Added new libraries to `requirements.in` for Telemetry.
parent d1c99c6b
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!261(CTTC) New Analytics Component
......@@ -41,18 +41,18 @@ class KpiValueWriter(GenericGrpcService):
@staticmethod
def RunKafkaConsumer():
thread = threading.Thread(target=KpiValueWriter.KafkaConsumer, args=())
thread = threading.Thread(target=KpiValueWriter.KafkaKpiConsumer, args=())
ACTIVE_CONSUMERS.append(thread)
thread.start()
@staticmethod
def KafkaConsumer():
def KafkaKpiConsumer():
kpi_manager_client = KpiManagerClient()
metric_writer = MetricWriterToPrometheus()
kafka_consumer = KafkaConsumer(
{ 'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : __class__,
'group.id' : 'KpiValueWriter',
'auto.offset.reset' : 'latest'}
)
kafka_consumer.subscribe([KafkaTopic.VALUE.value])
......
......@@ -14,11 +14,11 @@
# read Kafka stream from Kafka topic
import ast
import time
import threading
# import ast
# import time
# import threading
import logging
from prometheus_client import Gauge, CollectorRegistry
from prometheus_client import Gauge
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.kpi_value_api_pb2 import KpiValue
......
......@@ -13,3 +13,7 @@
# limitations under the License.
confluent-kafka==2.3.*
psycopg2-binary==2.9.*
SQLAlchemy==1.4.*
sqlalchemy-cockroachdb==1.4.*
SQLAlchemy-Utils==0.38.*
\ No newline at end of file
......@@ -18,10 +18,12 @@ import random
import logging
import threading
from typing import Any, Dict
from common.proto.context_pb2 import Empty
# from common.proto.context_pb2 import Empty
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaError
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.method_wrappers.Decorator import MetricsPool
from common.tools.service.GenericGrpcService import GenericGrpcService
......@@ -38,8 +40,10 @@ class TelemetryBackendService(GenericGrpcService):
Class listens for request on Kafka topic, fetches requested metrics from device.
Produces metrics on both RESPONSE and VALUE kafka topics.
"""
def __init__(self):
def __init__(self, cls_name : str = __name__) -> None:
LOGGER.info('Init TelemetryBackendService')
port = get_service_port_grpc(ServiceNameEnum.TELEMETRYBACKEND)
super().__init__(port, cls_name=cls_name)
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'backend',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment