From ee9c2b6c1594980da3aececd021c653852df9adb Mon Sep 17 00:00:00 2001 From: Waleed Akbar <wakbar@cttc.es> Date: Tue, 6 Aug 2024 15:10:17 +0000 Subject: [PATCH] changes in Telemetry Frontend service and client. - collector description is removed from TelemetryModel. - "ConvertCollectorToRow" is added in Telemetry Model class. - NameMapping is removed from service client and service. - TelemetryDB object name and import is updated with correct class name. - StartCollector is restructured. - "PublishRequestOnKafka" is restructured. --- src/telemetry/database/TelemetryModel.py | 29 +++-- .../service/TelemetryFrontendService.py | 5 +- .../TelemetryFrontendServiceServicerImpl.py | 107 +++++++----------- src/telemetry/frontend/tests/Messages.py | 3 +- 4 files changed, 66 insertions(+), 78 deletions(-) diff --git a/src/telemetry/database/TelemetryModel.py b/src/telemetry/database/TelemetryModel.py index 95f692e4b..1faf16e1a 100644 --- a/src/telemetry/database/TelemetryModel.py +++ b/src/telemetry/database/TelemetryModel.py @@ -28,17 +28,32 @@ class Collector(Base): collector_id = Column(UUID(as_uuid=False), primary_key=True) kpi_id = Column(UUID(as_uuid=False), nullable=False) - collector_decription = Column(String , nullable=False) sampling_duration_s = Column(Float , nullable=False) sampling_interval_s = Column(Float , nullable=False) - start_timestamp = Column(Float , nullable=False) - end_timestamp = Column(Float , nullable=False) + start_timestamp = Column(String , nullable=False) + end_timestamp = Column(String , nullable=False) # helps in logging the information def __repr__(self): - return (f"<Collector(collector_id='{self.collector_id}', kpi_id='{self.kpi_id}', " - f"collector='{self.collector_decription}', sampling_duration_s='{self.sampling_duration_s}', " - f"sampling_interval_s='{self.sampling_interval_s}', start_timestamp='{self.start_timestamp}', " - f"end_timestamp='{self.end_timestamp}')>") + return (f"<Collector(collector_id='{self.collector_id}' , kpi_id='{self.kpi_id}', " + f"sampling_duration_s='{self.sampling_duration_s}', sampling_interval_s='{self.sampling_interval_s}'," + f"start_timestamp='{self.start_timestamp}' , end_timestamp='{self.end_timestamp}')>") + + @classmethod + def ConvertCollectorToRow(cls, request): + """ + Create an instance of collector rows from a request object. + Args: request: The request object containing collector gRPC message. + Returns: A row (an instance of Collector table) initialized with content of the request. + """ + return cls( + collector_id = request.collector_id.collector_id.uuid, + kpi_id = request.kpi_id.kpi_id.uuid, + sampling_duration_s = request.duration_s, + sampling_interval_s = request.interval_s, + start_timestamp = request.start_time.timestamp, + end_timestamp = request.end_time.timestamp + ) # add method to convert gRPC requests to rows if necessary... + diff --git a/src/telemetry/frontend/service/TelemetryFrontendService.py b/src/telemetry/frontend/service/TelemetryFrontendService.py index dc3f8df36..abd361aa0 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendService.py +++ b/src/telemetry/frontend/service/TelemetryFrontendService.py @@ -14,17 +14,16 @@ from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc -from monitoring.service.NameMapping import NameMapping from common.tools.service.GenericGrpcService import GenericGrpcService from common.proto.telemetry_frontend_pb2_grpc import add_TelemetryFrontendServiceServicer_to_server from telemetry.frontend.service.TelemetryFrontendServiceServicerImpl import TelemetryFrontendServiceServicerImpl class TelemetryFrontendService(GenericGrpcService): - def __init__(self, name_mapping : NameMapping, cls_name: str = __name__) -> None: + def __init__(self, cls_name: str = __name__) -> None: port = get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND) super().__init__(port, cls_name=cls_name) - self.telemetry_frontend_servicer = TelemetryFrontendServiceServicerImpl(name_mapping) + self.telemetry_frontend_servicer = TelemetryFrontendServiceServicerImpl() def install_servicers(self): add_TelemetryFrontendServiceServicer_to_server(self.telemetry_frontend_servicer, self.server) diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index e6830ad67..49641aae1 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -14,100 +14,74 @@ import ast import threading -import time from typing import Tuple, Any import grpc import logging +from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from confluent_kafka import Consumer as KafkaConsumer -from common.proto.context_pb2 import Empty -from monitoring.service.NameMapping import NameMapping from confluent_kafka import Producer as KafkaProducer -from confluent_kafka import KafkaException from confluent_kafka import KafkaError + +from common.proto.context_pb2 import Empty + from common.proto.telemetry_frontend_pb2 import CollectorId, Collector, CollectorFilter, CollectorList -from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceServicer - from telemetry.database.TelemetryModel import Collector as CollectorModel -from telemetry.database.managementDB import managementDB +from telemetry.database.Telemetry_DB import TelemetryDB + LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('Monitoring', 'TelemetryFrontend') -KAFKA_SERVER_IP = '127.0.0.1:9092' +METRICS_POOL = MetricsPool('TelemetryFrontend', 'NBIgRPC') ACTIVE_COLLECTORS = [] -KAFKA_TOPICS = {'request' : 'topic_request', - 'response': 'topic_response'} class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): - def __init__(self, name_mapping : NameMapping): + def __init__(self): LOGGER.info('Init TelemetryFrontendService') - self.managementDBobj = managementDB() - self.kafka_producer = KafkaProducer({'bootstrap.servers': KAFKA_SERVER_IP,}) - self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KAFKA_SERVER_IP, - 'group.id' : 'frontend', - 'auto.offset.reset' : 'latest'}) + self.DBobj = TelemetryDB() + self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value}) + self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value, + 'group.id' : 'frontend', + 'auto.offset.reset' : 'latest'}) - def add_collector_to_db(self, request: Collector ): # type: ignore - try: - # Create a new Collector instance - collector_to_insert = CollectorModel() - collector_to_insert.collector_id = request.collector_id.collector_id.uuid - collector_to_insert.kpi_id = request.kpi_id.kpi_id.uuid - # collector_to_insert.collector_decription= request.collector - collector_to_insert.sampling_duration_s = request.duration_s - collector_to_insert.sampling_interval_s = request.interval_s - collector_to_insert.start_timestamp = time.time() - collector_to_insert.end_timestamp = time.time() - managementDB.add_row_to_db(collector_to_insert) - except Exception as e: - LOGGER.info("Unable to create collectorModel class object. {:}".format(e)) - - # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StartCollector(self, request : Collector, grpc_context: grpc.ServicerContext # type: ignore ) -> CollectorId: # type: ignore - # push info to frontend db LOGGER.info ("gRPC message: {:}".format(request)) response = CollectorId() - _collector_id = str(request.collector_id.collector_id.uuid) - _collector_kpi_id = str(request.kpi_id.kpi_id.uuid) - _collector_duration = int(request.duration_s) - _collector_interval = int(request.interval_s) - # pushing Collector to DB - self.add_collector_to_db(request) - self.publish_to_kafka_request_topic(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval) - # self.run_publish_to_kafka_request_topic(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval) + + # TODO: Verify the presence of Kpi ID in KpiDB or assume that KPI ID already exists. + self.DBobj.add_row_to_db( + CollectorModel.ConvertCollectorToRow(request) + ) + self.PublishRequestOnKafka(request) + response.collector_id.uuid = request.collector_id.collector_id.uuid # type: ignore return response - - def run_publish_to_kafka_request_topic(self, msg_key: str, kpi: str, duration : int, interval: int): - # Add threading.Thread() response to dictonary and call start() in the next statement - threading.Thread(target=self.publish_to_kafka_request_topic, args=(msg_key, kpi, duration, interval)).start() - - def publish_to_kafka_request_topic(self, - collector_id: str, kpi: str, duration : int, interval: int - ): + + def PublishRequestOnKafka(self, collector_obj): """ - Method to generate collector request to Kafka topic. + Method to generate collector request on Kafka. """ - # time.sleep(5) - # producer_configs = { - # 'bootstrap.servers': KAFKA_SERVER_IP, - # } - # topic_request = "topic_request" - msg_value : Tuple [str, int, int] = (kpi, duration, interval) - # print ("Request generated: ", "Colletcor Id: ", collector_id, \ - # ", \nKPI: ", kpi, ", Duration: ", duration, ", Interval: ", interval) - # producerObj = KafkaProducer(producer_configs) - self.kafka_producer.produce(KAFKA_TOPICS['request'], key=collector_id, value= str(msg_value), callback=self.delivery_callback) - # producerObj.produce(KAFKA_TOPICS['request'], key=collector_id, value= str(msg_value), callback=self.delivery_callback) - LOGGER.info("Collector Request Generated: {:}, {:}, {:}, {:}".format(collector_id, kpi, duration, interval)) - # producerObj.produce(topic_request, key=collector_id, value= str(msg_value), callback=self.delivery_callback) + collector_id = collector_obj.collector_id.collector_id.uuid + collector_to_generate : Tuple [str, int, int] = ( + collector_obj.kpi_id.kpi_id.uuid, + collector_obj.duration_s, + collector_obj.interval_s + ) + self.kafka_producer.produce( + KafkaTopic.REQUEST.value, + key = collector_id, + value = str(collector_to_generate), + callback = self.delivery_callback + ) + LOGGER.info("Collector Request Generated: Collector Id: {:}, Value: {:}".format(collector_id, collector_to_generate)) ACTIVE_COLLECTORS.append(collector_id) self.kafka_producer.flush() - + def run_kafka_listener(self): # print ("--- STARTED: run_kafka_listener ---") threading.Thread(target=self.kafka_listener).start() @@ -201,4 +175,5 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): response.collector_list.append(collector_obj) return response except Exception as e: - LOGGER.info('Unable to process response {:}'.format(e)) \ No newline at end of file + LOGGER.info('Unable to process response {:}'.format(e)) + diff --git a/src/telemetry/frontend/tests/Messages.py b/src/telemetry/frontend/tests/Messages.py index 1205898d1..106c2a5a7 100644 --- a/src/telemetry/frontend/tests/Messages.py +++ b/src/telemetry/frontend/tests/Messages.py @@ -17,7 +17,6 @@ import random from common.proto import telemetry_frontend_pb2 from common.proto.kpi_sample_types_pb2 import KpiSampleType - # ----------------------- "2nd" Iteration -------------------------------- def create_collector_id(): _collector_id = telemetry_frontend_pb2.CollectorId() @@ -32,7 +31,7 @@ def create_collector_id(): def create_collector_request(): _create_collector_request = telemetry_frontend_pb2.Collector() _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) - _create_collector_request.kpi_id.kpi_id.uuid = "165d20c5-a446-42fa-812f-e2b7ed283c6f" + _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) # _create_collector_request.collector = "collector description" _create_collector_request.duration_s = float(random.randint(8, 16)) _create_collector_request.interval_s = float(random.randint(2, 4)) -- GitLab