diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index 498d07a91d43d28ce65bc5eaa980d44541348013..518dd471d39793cd31354e69d37835a155ea619a 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -12,42 +12,84 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Tuple import grpc import logging from common.proto.context_pb2 import Empty from monitoring.service.NameMapping import NameMapping +from confluent_kafka import Producer as KafkaProducer +from confluent_kafka import KafkaException from common.proto.telemetry_frontend_pb2 import CollectorId, Collector, CollectorFilter, CollectorList from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceServicer -LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('Monitoring', 'TelemetryFrontend') +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('Monitoring', 'TelemetryFrontend') +KAFKA_SERVER_IP = '127.0.0.1:9092' class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): def __init__(self, name_mapping : NameMapping): LOGGER.info('Init TelemetryFrontendService') - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def StartCollector(self, request : Collector, grpc_context: grpc.ServicerContext # type: ignore - ) -> CollectorId: # type: ignore + # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def StartCollector(self, + request : Collector, grpc_context: grpc.ServicerContext # type: ignore + ) -> CollectorId: # type: ignore + # push info to frontend db response = CollectorId() - _collector_id = request.collector_id - # collector_kpi_id = request.kpi_id - # collector_duration = request.duration_s - # collector_interval = request.interval_s + _collector_id = request.collector_id + _collector_kpi_id = str(request.kpi_id.kpi_id.uuid) + _collector_duration = int(request.duration_s) + _collector_interval = int(request.interval_s) + activeCollObj = self.generate_kafka_request(str(_collector_id), _collector_kpi_id, _collector_duration, _collector_interval) response.collector_id.uuid = _collector_id.collector_id.uuid return response @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def StopCollector(self, request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore - ) -> Empty: # type: ignore + def StopCollector(self, + request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore + ) -> Empty: # type: ignore request.collector_id.uuid = "" return Empty() - - def SelectCollectors(self, request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore - ) -> CollectorList: # type: ignore + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def SelectCollectors(self, + request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore + ) -> CollectorList: # type: ignore response = CollectorList() - - return response \ No newline at end of file + return response + + # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def generate_kafka_request(self, + msg_key, kpi: str, duration : int, interval: int + ) -> KafkaProducer: + """ + Method to generate collector request to Kafka topic. + """ + producer_configs = { + 'bootstrap.servers': KAFKA_SERVER_IP, + 'group.id' : 'requester', + } + topic_request = "topic_request" + msg_value = Tuple [str, int, int] + msg_value = (kpi, duration, interval) + + producerObj = KafkaProducer(producer_configs) + producerObj.produce(topic_request, key=msg_key, value= str(msg_value), callback=self.delivery_callback) + producerObj.flush() + return producerObj + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def delivery_callback(self, err, msg): + """ + Callback function to handle message delivery status. + Args: + err (KafkaError): Kafka error object. + msg (Message): Kafka message object. + """ + if err: + print(f'Message delivery failed: {err}') + else: + print(f'Message delivered to topic {msg.topic()}') \ No newline at end of file