From 1c15a21abb8cd144f20704b9a8e9ccfccf446f46 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Tue, 14 May 2024 13:57:10 +0000 Subject: [PATCH] Added "Kafka_listener" --- .../TelemetryFrontendServiceServicerImpl.py | 90 ++++++++++++++----- 1 file changed, 69 insertions(+), 21 deletions(-) diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index 518dd471d..045c56d5b 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -12,13 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +import ast +import threading +import time from typing import Tuple import grpc import logging + +from confluent_kafka import Consumer as KafkaConsumer from common.proto.context_pb2 import Empty from monitoring.service.NameMapping import NameMapping from confluent_kafka import Producer as KafkaProducer from confluent_kafka import KafkaException +from confluent_kafka import KafkaError from common.proto.telemetry_frontend_pb2 import CollectorId, Collector, CollectorFilter, CollectorList from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceServicer @@ -38,49 +44,77 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): ) -> CollectorId: # type: ignore # push info to frontend db response = CollectorId() - _collector_id = request.collector_id + _collector_id = str(request.collector_id.collector_id.uuid) _collector_kpi_id = str(request.kpi_id.kpi_id.uuid) _collector_duration = int(request.duration_s) _collector_interval = int(request.interval_s) - activeCollObj = self.generate_kafka_request(str(_collector_id), _collector_kpi_id, _collector_duration, _collector_interval) - - response.collector_id.uuid = _collector_id.collector_id.uuid + self.generate_kafka_request(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval) + # self.run_generate_kafka_request(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval) + response.collector_id.uuid = request.collector_id.collector_id.uuid return response - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def StopCollector(self, - request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore - ) -> Empty: # type: ignore - request.collector_id.uuid = "" - return Empty() + def run_generate_kafka_request(self, msg_key: str, kpi: str, duration : int, interval: int): + threading.Thread(target=self.generate_kafka_request, args=(msg_key, kpi, duration, interval)).start() - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def SelectCollectors(self, - request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore - ) -> CollectorList: # type: ignore - response = CollectorList() - return response - # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def generate_kafka_request(self, - msg_key, kpi: str, duration : int, interval: int + msg_key: str, kpi: str, duration : int, interval: int ) -> KafkaProducer: """ Method to generate collector request to Kafka topic. """ + # time.sleep(5) producer_configs = { 'bootstrap.servers': KAFKA_SERVER_IP, - 'group.id' : 'requester', } topic_request = "topic_request" msg_value = Tuple [str, int, int] msg_value = (kpi, duration, interval) - + print ("Request generated: ", "Colletcor Id: ", msg_key, \ + ", \nKPI: ", kpi, ", Duration: ", duration, ", Interval: ", interval) producerObj = KafkaProducer(producer_configs) producerObj.produce(topic_request, key=msg_key, value= str(msg_value), callback=self.delivery_callback) producerObj.flush() return producerObj + def run_kafka_listener(self): + # print ("--- STARTED: run_kafka_listener ---") + threading.Thread(target=self.kafka_listener).start() + return True + + def kafka_listener(self): + """ + listener for response on Kafka topic. + """ + # print ("--- STARTED: kafka_listener ---") + conusmer_configs = { + 'bootstrap.servers' : KAFKA_SERVER_IP, + 'group.id' : 'frontend', + 'auto.offset.reset' : 'latest' + } + topic_response = "topic_response" + + consumerObj = KafkaConsumer(conusmer_configs) + consumerObj.subscribe([topic_response]) + # print (time.time()) + while True: + receive_msg = consumerObj.poll(2.0) + if receive_msg is None: + print (" - Telemetry frontend listening on Kafka Topic: ", topic_response) # added for debugging purposes + continue + elif receive_msg.error(): + if receive_msg.error().code() == KafkaError._PARTITION_EOF: + continue + else: + print("Consumer error: {}".format(receive_msg.error())) + break + (kpi_id, kpi_value) = ast.literal_eval(receive_msg.value().decode('utf-8')) + self.process_response(kpi_id, kpi_value) + # threading.Thread(target=self.process_response, args=(kpi_id, kpi_value)).start() + + def process_response(self, kpi_id: str, kpi_value: any): + print ("Frontend - KPI: ", kpi_id, ", VALUE: ", kpi_value) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def delivery_callback(self, err, msg): """ @@ -92,4 +126,18 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): if err: print(f'Message delivery failed: {err}') else: - print(f'Message delivered to topic {msg.topic()}') \ No newline at end of file + print(f'Message delivered to topic {msg.topic()}') + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def StopCollector(self, + request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore + ) -> Empty: # type: ignore + request.collector_id.uuid = "" + return Empty() + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def SelectCollectors(self, + request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore + ) -> CollectorList: # type: ignore + response = CollectorList() + return response \ No newline at end of file -- GitLab