diff --git a/src/telemetry/database/TelemetryModel.py b/src/telemetry/database/TelemetryModel.py index 1faf16e1acc7b3f20ee450525b3178953ae9ca64..611ce7e70e45e71fc28cc3608906d3d17d6f7d54 100644 --- a/src/telemetry/database/TelemetryModel.py +++ b/src/telemetry/database/TelemetryModel.py @@ -16,6 +16,7 @@ import logging from sqlalchemy.dialects.postgresql import UUID from sqlalchemy import Column, String, Float from sqlalchemy.orm import registry +from common.proto import telemetry_frontend_pb2 logging.basicConfig(level=logging.INFO) LOGGER = logging.getLogger(__name__) @@ -30,8 +31,8 @@ class Collector(Base): kpi_id = Column(UUID(as_uuid=False), nullable=False) sampling_duration_s = Column(Float , nullable=False) sampling_interval_s = Column(Float , nullable=False) - start_timestamp = Column(String , nullable=False) - end_timestamp = Column(String , nullable=False) + start_timestamp = Column(Float , nullable=False) + end_timestamp = Column(Float , nullable=False) # helps in logging the information def __repr__(self): @@ -42,7 +43,7 @@ class Collector(Base): @classmethod def ConvertCollectorToRow(cls, request): """ - Create an instance of collector rows from a request object. + Create an instance of Collector table rows from a request object. Args: request: The request object containing collector gRPC message. Returns: A row (an instance of Collector table) initialized with content of the request. """ @@ -55,5 +56,18 @@ class Collector(Base): end_timestamp = request.end_time.timestamp ) -# add method to convert gRPC requests to rows if necessary... - + @classmethod + def ConvertRowToCollector(cls, row): + """ + Create and return a dictionary representation of a Collector table instance. + Args: row: The Collector table instance (row) containing the data. + Returns: collector gRPC message initialized with the content of a row. + """ + response = telemetry_frontend_pb2.Collector() + response.collector_id.collector_id.uuid = row.collector_id + response.kpi_id.kpi_id.uuid = row.kpi_id + response.duration_s = row.sampling_duration_s + response.interval_s = row.sampling_interval_s + response.start_time.timestamp = row.start_timestamp + response.end_time.timestamp = row.end_timestamp + return response diff --git a/src/telemetry/database/Telemetry_DB.py b/src/telemetry/database/Telemetry_DB.py index ec7da9e400bde6f34d6c1b14b509e0426eab1dad..32acfd73a410a7bfddd6b487d0b1962afadb3842 100644 --- a/src/telemetry/database/Telemetry_DB.py +++ b/src/telemetry/database/Telemetry_DB.py @@ -121,13 +121,13 @@ class TelemetryDB: query = session.query(CollectorModel) # Apply filters based on the filter_object if filter_object.kpi_id: - query = query.filter(CollectorModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) + query = query.filter(CollectorModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) result = query.all() - + # query should be added to return all rows if result: LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result} else: - LOGGER.debug(f"No matching row found in {model.__name__} table with filters: {filter_object}") + LOGGER.warning(f"No matching row found in {model.__name__} table with filters: {filter_object}") return result except Exception as e: LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}") diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index 49641aae1a19387414efddf7ee0eca0aea567c4c..29c192bdf819df0c3865c4917598b36e76095fef 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -34,18 +34,20 @@ from telemetry.database.Telemetry_DB import TelemetryDB LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('TelemetryFrontend', 'NBIgRPC') -ACTIVE_COLLECTORS = [] +ACTIVE_COLLECTORS = [] # keep and can be populated from DB class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): def __init__(self): LOGGER.info('Init TelemetryFrontendService') - self.DBobj = TelemetryDB() + self.tele_db_obj = TelemetryDB() self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value}) self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value, 'group.id' : 'frontend', 'auto.offset.reset' : 'latest'}) + # --->>> SECTION: StartCollector with all helper methods <<<--- + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StartCollector(self, request : Collector, grpc_context: grpc.ServicerContext # type: ignore @@ -54,7 +56,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): response = CollectorId() # TODO: Verify the presence of Kpi ID in KpiDB or assume that KPI ID already exists. - self.DBobj.add_row_to_db( + self.tele_db_obj.add_row_to_db( CollectorModel.ConvertCollectorToRow(request) ) self.PublishRequestOnKafka(request) @@ -66,7 +68,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): """ Method to generate collector request on Kafka. """ - collector_id = collector_obj.collector_id.collector_id.uuid + collector_uuid = collector_obj.collector_id.collector_id.uuid collector_to_generate : Tuple [str, int, int] = ( collector_obj.kpi_id.kpi_id.uuid, collector_obj.duration_s, @@ -74,12 +76,12 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): ) self.kafka_producer.produce( KafkaTopic.REQUEST.value, - key = collector_id, + key = collector_uuid, value = str(collector_to_generate), callback = self.delivery_callback ) - LOGGER.info("Collector Request Generated: Collector Id: {:}, Value: {:}".format(collector_id, collector_to_generate)) - ACTIVE_COLLECTORS.append(collector_id) + LOGGER.info("Collector Request Generated: Collector Id: {:}, Value: {:}".format(collector_uuid, collector_to_generate)) + ACTIVE_COLLECTORS.append(collector_uuid) self.kafka_producer.flush() def run_kafka_listener(self): @@ -141,39 +143,59 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): msg (Message): Kafka message object. """ if err: - print(f'Message delivery failed: {err}') + LOGGER.debug('Message delivery failed: {:}'.format(err)) + print('Message delivery failed: {:}'.format(err)) else: - print(f'Message delivered to topic {msg.topic()}') + LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) + print('Message delivered to topic {:}'.format(msg.topic())) + + # <<<--- SECTION: StopCollector with all helper methods --->>> @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StopCollector(self, request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore ) -> Empty: # type: ignore LOGGER.info ("gRPC message: {:}".format(request)) - _collector_id = request.collector_id.uuid - self.publish_to_kafka_request_topic(_collector_id, "", -1, -1) + self.PublishStopRequestOnKafka(request) return Empty() + def PublishStopRequestOnKafka(self, collector_id): + """ + Method to generate stop collector request on Kafka. + """ + collector_uuid = collector_id.collector_id.uuid + collector_to_stop : Tuple [str, int, int] = ( + collector_uuid , -1, -1 + ) + self.kafka_producer.produce( + KafkaTopic.REQUEST.value, + key = collector_uuid, + value = str(collector_to_stop), + callback = self.delivery_callback + ) + LOGGER.info("Collector Stop Request Generated: Collector Id: {:}, Value: {:}".format(collector_uuid, collector_to_stop)) + try: + ACTIVE_COLLECTORS.remove(collector_uuid) + except ValueError: + LOGGER.warning('Collector ID {:} not found in active collector list'.format(collector_uuid)) + self.kafka_producer.flush() + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectCollectors(self, request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore ) -> CollectorList: # type: ignore LOGGER.info("gRPC message: {:}".format(request)) response = CollectorList() - filter_to_apply = dict() - filter_to_apply['kpi_id'] = request.kpi_id[0].kpi_id.uuid - # filter_to_apply['duration_s'] = request.duration_s[0] + try: - rows = self.managementDBobj.select_with_filter(CollectorModel, **filter_to_apply) + rows = self.tele_db_obj.select_with_filter(CollectorModel, request) except Exception as e: LOGGER.info('Unable to apply filter on kpi descriptor. {:}'.format(e)) try: - if len(rows) != 0: - for row in rows: - collector_obj = Collector() - collector_obj.collector_id.collector_id.uuid = row.collector_id - response.collector_list.append(collector_obj) + for row in rows: + collector_obj = CollectorModel.ConvertRowToCollector(row) + response.collector_list.append(collector_obj) return response except Exception as e: - LOGGER.info('Unable to process response {:}'.format(e)) + LOGGER.info('Unable to process filter response {:}'.format(e)) diff --git a/src/telemetry/frontend/tests/Messages.py b/src/telemetry/frontend/tests/Messages.py index 106c2a5a7e03fa9c5b0802388cc9bde333a53c90..a0e93e8a121b9efaac83f7169419911c8ee6e3ea 100644 --- a/src/telemetry/frontend/tests/Messages.py +++ b/src/telemetry/frontend/tests/Messages.py @@ -16,67 +16,27 @@ import uuid import random from common.proto import telemetry_frontend_pb2 from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.proto.kpi_manager_pb2 import KpiId # ----------------------- "2nd" Iteration -------------------------------- def create_collector_id(): _collector_id = telemetry_frontend_pb2.CollectorId() - _collector_id.collector_id.uuid = uuid.uuid4() + # _collector_id.collector_id.uuid = str(uuid.uuid4()) + _collector_id.collector_id.uuid = "5d45f53f-d567-429f-9427-9196ac72ff0c" return _collector_id -# def create_collector_id_a(coll_id_str : str): -# _collector_id = telemetry_frontend_pb2.CollectorId() -# _collector_id.collector_id.uuid = str(coll_id_str) -# return _collector_id - def create_collector_request(): _create_collector_request = telemetry_frontend_pb2.Collector() _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) - # _create_collector_request.collector = "collector description" _create_collector_request.duration_s = float(random.randint(8, 16)) _create_collector_request.interval_s = float(random.randint(2, 4)) return _create_collector_request def create_collector_filter(): _create_collector_filter = telemetry_frontend_pb2.CollectorFilter() - new_kpi_id = _create_collector_filter.kpi_id.add() - new_kpi_id.kpi_id.uuid = "165d20c5-a446-42fa-812f-e2b7ed283c6f" + kpi_id_obj = KpiId() + # kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) + kpi_id_obj.kpi_id.uuid = "a7237fa3-caf4-479d-84b6-4d9f9738fb7f" + _create_collector_filter.kpi_id.append(kpi_id_obj) return _create_collector_filter - -# ----------------------- "First" Iteration -------------------------------- -# def create_collector_request_a(): -# _create_collector_request_a = telemetry_frontend_pb2.Collector() -# _create_collector_request_a.collector_id.collector_id.uuid = "-1" -# return _create_collector_request_a - -# def create_collector_request_b(str_kpi_id, coll_duration_s, coll_interval_s -# ) -> telemetry_frontend_pb2.Collector: -# _create_collector_request_b = telemetry_frontend_pb2.Collector() -# _create_collector_request_b.collector_id.collector_id.uuid = '1' -# _create_collector_request_b.kpi_id.kpi_id.uuid = str_kpi_id -# _create_collector_request_b.duration_s = coll_duration_s -# _create_collector_request_b.interval_s = coll_interval_s -# return _create_collector_request_b - -# def create_collector_filter(): -# _create_collector_filter = telemetry_frontend_pb2.CollectorFilter() -# new_collector_id = _create_collector_filter.collector_id.add() -# new_collector_id.collector_id.uuid = "COLL1" -# new_kpi_id = _create_collector_filter.kpi_id.add() -# new_kpi_id.kpi_id.uuid = "KPI1" -# new_device_id = _create_collector_filter.device_id.add() -# new_device_id.device_uuid.uuid = 'DEV1' -# new_service_id = _create_collector_filter.service_id.add() -# new_service_id.service_uuid.uuid = 'SERV1' -# new_slice_id = _create_collector_filter.slice_id.add() -# new_slice_id.slice_uuid.uuid = 'SLC1' -# new_endpoint_id = _create_collector_filter.endpoint_id.add() -# new_endpoint_id.endpoint_uuid.uuid = 'END1' -# new_connection_id = _create_collector_filter.connection_id.add() -# new_connection_id.connection_uuid.uuid = 'CON1' -# _create_collector_filter.kpi_sample_type.append(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED) -# return _create_collector_filter - -# def create_collector_list(): -# _create_collector_list = telemetry_frontend_pb2.CollectorList() -# return _create_collector_list \ No newline at end of file diff --git a/src/telemetry/frontend/tests/test_frontend.py b/src/telemetry/frontend/tests/test_frontend.py index ca2d613701fad97a423de9fe82e6ed3654475e7b..d967e306ac9be501b80f7ec7b83cd14329d0f793 100644 --- a/src/telemetry/frontend/tests/test_frontend.py +++ b/src/telemetry/frontend/tests/test_frontend.py @@ -19,6 +19,7 @@ import logging # from common.proto.context_pb2 import Empty from common.Constants import ServiceNameEnum from common.proto.telemetry_frontend_pb2 import CollectorId, CollectorList +from common.proto.context_pb2 import Empty from common.Settings import ( get_service_port_grpc, get_env_var_name, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC) @@ -26,7 +27,7 @@ from common.Settings import ( from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient from telemetry.frontend.service.TelemetryFrontendService import TelemetryFrontendService from telemetry.frontend.tests.Messages import ( - create_collector_request, create_collector_filter) + create_collector_request, create_collector_id, create_collector_filter) ########################### @@ -85,6 +86,18 @@ def test_StartCollector(telemetryFrontend_client): LOGGER.debug(str(response)) assert isinstance(response, CollectorId) +def test_StopCollector(telemetryFrontend_client): + LOGGER.info(' >>> test_StopCollector START: <<< ') + response = telemetryFrontend_client.StopCollector(create_collector_id()) + LOGGER.debug(str(response)) + assert isinstance(response, Empty) + +def test_SelectCollectors(telemetryFrontend_client): + LOGGER.info(' >>> test_SelectCollectors START: <<< ') + response = telemetryFrontend_client.SelectCollectors(create_collector_filter()) + LOGGER.debug(str(response)) + assert isinstance(response, CollectorList) + # ------- previous test ---------------- # def test_verify_db_and_table():