Loading src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +58 −16 Original line number Diff line number Diff line Loading @@ -12,10 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. from typing import Tuple import grpc import logging from common.proto.context_pb2 import Empty from monitoring.service.NameMapping import NameMapping from confluent_kafka import Producer as KafkaProducer from confluent_kafka import KafkaException from common.proto.telemetry_frontend_pb2 import CollectorId, Collector, CollectorFilter, CollectorList from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceServicer Loading @@ -23,31 +26,70 @@ from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceSer LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Monitoring', 'TelemetryFrontend') KAFKA_SERVER_IP = '127.0.0.1:9092' class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): def __init__(self, name_mapping : NameMapping): LOGGER.info('Init TelemetryFrontendService') @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StartCollector(self, request : Collector, grpc_context: grpc.ServicerContext # type: ignore # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StartCollector(self, request : Collector, grpc_context: grpc.ServicerContext # type: ignore ) -> CollectorId: # type: ignore # push info to frontend db response = CollectorId() _collector_id = request.collector_id # collector_kpi_id = request.kpi_id # collector_duration = request.duration_s # collector_interval = request.interval_s _collector_kpi_id = str(request.kpi_id.kpi_id.uuid) _collector_duration = int(request.duration_s) _collector_interval = int(request.interval_s) activeCollObj = self.generate_kafka_request(str(_collector_id), _collector_kpi_id, _collector_duration, _collector_interval) response.collector_id.uuid = _collector_id.collector_id.uuid return response @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StopCollector(self, request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore def StopCollector(self, request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore ) -> Empty: # type: ignore request.collector_id.uuid = "" return Empty() def SelectCollectors(self, request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectCollectors(self, request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore ) -> CollectorList: # type: ignore response = CollectorList() return response # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def generate_kafka_request(self, msg_key, kpi: str, duration : int, interval: int ) -> KafkaProducer: """ Method to generate collector request to Kafka topic. """ producer_configs = { 'bootstrap.servers': KAFKA_SERVER_IP, 'group.id' : 'requester', } topic_request = "topic_request" msg_value = Tuple [str, int, int] msg_value = (kpi, duration, interval) producerObj = KafkaProducer(producer_configs) producerObj.produce(topic_request, key=msg_key, value= str(msg_value), callback=self.delivery_callback) producerObj.flush() return producerObj @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def delivery_callback(self, err, msg): """ Callback function to handle message delivery status. Args: err (KafkaError): Kafka error object. msg (Message): Kafka message object. """ if err: print(f'Message delivery failed: {err}') else: print(f'Message delivered to topic {msg.topic()}') No newline at end of file Loading
src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +58 −16 Original line number Diff line number Diff line Loading @@ -12,10 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. from typing import Tuple import grpc import logging from common.proto.context_pb2 import Empty from monitoring.service.NameMapping import NameMapping from confluent_kafka import Producer as KafkaProducer from confluent_kafka import KafkaException from common.proto.telemetry_frontend_pb2 import CollectorId, Collector, CollectorFilter, CollectorList from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceServicer Loading @@ -23,31 +26,70 @@ from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceSer LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Monitoring', 'TelemetryFrontend') KAFKA_SERVER_IP = '127.0.0.1:9092' class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): def __init__(self, name_mapping : NameMapping): LOGGER.info('Init TelemetryFrontendService') @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StartCollector(self, request : Collector, grpc_context: grpc.ServicerContext # type: ignore # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StartCollector(self, request : Collector, grpc_context: grpc.ServicerContext # type: ignore ) -> CollectorId: # type: ignore # push info to frontend db response = CollectorId() _collector_id = request.collector_id # collector_kpi_id = request.kpi_id # collector_duration = request.duration_s # collector_interval = request.interval_s _collector_kpi_id = str(request.kpi_id.kpi_id.uuid) _collector_duration = int(request.duration_s) _collector_interval = int(request.interval_s) activeCollObj = self.generate_kafka_request(str(_collector_id), _collector_kpi_id, _collector_duration, _collector_interval) response.collector_id.uuid = _collector_id.collector_id.uuid return response @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StopCollector(self, request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore def StopCollector(self, request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore ) -> Empty: # type: ignore request.collector_id.uuid = "" return Empty() def SelectCollectors(self, request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectCollectors(self, request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore ) -> CollectorList: # type: ignore response = CollectorList() return response # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def generate_kafka_request(self, msg_key, kpi: str, duration : int, interval: int ) -> KafkaProducer: """ Method to generate collector request to Kafka topic. """ producer_configs = { 'bootstrap.servers': KAFKA_SERVER_IP, 'group.id' : 'requester', } topic_request = "topic_request" msg_value = Tuple [str, int, int] msg_value = (kpi, duration, interval) producerObj = KafkaProducer(producer_configs) producerObj.produce(topic_request, key=msg_key, value= str(msg_value), callback=self.delivery_callback) producerObj.flush() return producerObj @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def delivery_callback(self, err, msg): """ Callback function to handle message delivery status. Args: err (KafkaError): Kafka error object. msg (Message): Kafka message object. """ if err: print(f'Message delivery failed: {err}') else: print(f'Message delivered to topic {msg.topic()}') No newline at end of file