# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from typing import Tuple import grpc import logging from common.proto.context_pb2 import Empty from monitoring.service.NameMapping import NameMapping from confluent_kafka import Producer as KafkaProducer from confluent_kafka import KafkaException from common.proto.telemetry_frontend_pb2 import CollectorId, Collector, CollectorFilter, CollectorList from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceServicer LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Monitoring', 'TelemetryFrontend') KAFKA_SERVER_IP = '127.0.0.1:9092' class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): def __init__(self, name_mapping : NameMapping): LOGGER.info('Init TelemetryFrontendService') # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StartCollector(self, request : Collector, grpc_context: grpc.ServicerContext # type: ignore ) -> CollectorId: # type: ignore # push info to frontend db response = CollectorId() _collector_id = request.collector_id _collector_kpi_id = str(request.kpi_id.kpi_id.uuid) _collector_duration = int(request.duration_s) _collector_interval = int(request.interval_s) activeCollObj = self.generate_kafka_request(str(_collector_id), _collector_kpi_id, _collector_duration, _collector_interval) response.collector_id.uuid = _collector_id.collector_id.uuid return response @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StopCollector(self, request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore ) -> Empty: # type: ignore request.collector_id.uuid = "" return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectCollectors(self, request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore ) -> CollectorList: # type: ignore response = CollectorList() return response # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def generate_kafka_request(self, msg_key, kpi: str, duration : int, interval: int ) -> KafkaProducer: """ Method to generate collector request to Kafka topic. """ producer_configs = { 'bootstrap.servers': KAFKA_SERVER_IP, 'group.id' : 'requester', } topic_request = "topic_request" msg_value = Tuple [str, int, int] msg_value = (kpi, duration, interval) producerObj = KafkaProducer(producer_configs) producerObj.produce(topic_request, key=msg_key, value= str(msg_value), callback=self.delivery_callback) producerObj.flush() return producerObj @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def delivery_callback(self, err, msg): """ Callback function to handle message delivery status. Args: err (KafkaError): Kafka error object. msg (Message): Kafka message object. """ if err: print(f'Message delivery failed: {err}') else: print(f'Message delivered to topic {msg.topic()}')