From 8d398733bf78762230c0b766acf9e2b8d17a7e9c Mon Sep 17 00:00:00 2001 From: Waleed Akbar <wakbar@cttc.es> Date: Mon, 10 Jun 2024 11:46:16 +0000 Subject: [PATCH] minor changes in TelemetryFrontend --- .../backend/tests/testTelemetryBackend.py | 34 ++++++------- src/telemetry/database/TelemetryModel.py | 48 ++++------------- src/telemetry/database/managementDB.py | 51 ++++++++++++------- .../TelemetryFrontendServiceServicerImpl.py | 4 +- src/telemetry/frontend/tests/Messages.py | 8 ++- src/telemetry/frontend/tests/test_frontend.py | 8 +++ 6 files changed, 77 insertions(+), 76 deletions(-) diff --git a/src/telemetry/backend/tests/testTelemetryBackend.py b/src/telemetry/backend/tests/testTelemetryBackend.py index e81e98473..f8abc08cf 100644 --- a/src/telemetry/backend/tests/testTelemetryBackend.py +++ b/src/telemetry/backend/tests/testTelemetryBackend.py @@ -28,26 +28,26 @@ LOGGER = logging.getLogger(__name__) # Tests Implementation of Telemetry Backend ########################### -# def test_verify_kafka_topics(): -# LOGGER.info('test_verify_kafka_topics requesting') -# TelemetryBackendServiceObj = TelemetryBackendService() -# KafkaTopics = ['topic_request', 'topic_response', 'topic_raw', 'topic_labled'] -# response = TelemetryBackendServiceObj.create_topic_if_not_exists(KafkaTopics) -# LOGGER.debug(str(response)) -# assert isinstance(response, bool) - -# def test_run_kafka_listener(): -# LOGGER.info('test_receive_kafka_request requesting') -# TelemetryBackendServiceObj = TelemetryBackendService() -# response = TelemetryBackendServiceObj.run_kafka_listener() -# LOGGER.debug(str(response)) -# assert isinstance(response, bool) +def test_verify_kafka_topics(): + LOGGER.info('test_verify_kafka_topics requesting') + TelemetryBackendServiceObj = TelemetryBackendService() + KafkaTopics = ['topic_request', 'topic_response', 'topic_raw', 'topic_labled'] + response = TelemetryBackendServiceObj.create_topic_if_not_exists(KafkaTopics) + LOGGER.debug(str(response)) + assert isinstance(response, bool) + +def test_run_kafka_listener(): + LOGGER.info('test_receive_kafka_request requesting') + TelemetryBackendServiceObj = TelemetryBackendService() + response = TelemetryBackendServiceObj.run_kafka_listener() + LOGGER.debug(str(response)) + assert isinstance(response, bool) # def test_fetch_node_exporter_metrics(): # LOGGER.info(' >>> test_fetch_node_exporter_metrics START <<< ') # TelemetryBackendService.fetch_single_node_exporter_metric() -def test_stream_node_export_metrics_to_raw_topic(): - LOGGER.info(' >>> test_stream_node_export_metrics_to_raw_topic START <<< ') - threading.Thread(target=TelemetryBackendService.stream_node_export_metrics_to_raw_topic, args=()).start() +# def test_stream_node_export_metrics_to_raw_topic(): +# LOGGER.info(' >>> test_stream_node_export_metrics_to_raw_topic START <<< ') +# threading.Thread(target=TelemetryBackendService.stream_node_export_metrics_to_raw_topic, args=()).start() diff --git a/src/telemetry/database/TelemetryModel.py b/src/telemetry/database/TelemetryModel.py index 8defdd2e8..54b7c13ef 100644 --- a/src/telemetry/database/TelemetryModel.py +++ b/src/telemetry/database/TelemetryModel.py @@ -17,55 +17,29 @@ from sqlalchemy.dialects.postgresql import UUID from sqlalchemy import Column, Integer, String, Float, Text, ForeignKey from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship - +from sqlalchemy.orm import registry logging.basicConfig(level=logging.INFO) LOGGER = logging.getLogger(__name__) # Create a base class for declarative models -Base = declarative_base() - -class Kpi(Base): - __tablename__ = 'kpi' - - kpi_id = Column(UUID(as_uuid=False), primary_key=True) - kpi_description = Column(Text) - kpi_sample_type = Column(Integer) - device_id = Column(String) - endpoint_id = Column(String) - service_id = Column(String) - slice_id = Column(String) - connection_id = Column(String) - link_id = Column(String) - # monitor_flag = Column(String) - - # Relationship to Collector model: allows access to related Collector objects from a Kpi object - collectors = relationship('Collector', back_populates='kpi') - - # helps in logging the information - def __repr__(self): - return (f"<Kpi(kpi_id='{self.kpi_id}', kpi_description='{self.kpi_description}', " - f"kpi_sample_type='{self.kpi_sample_type}', device_id='{self.device_id}', " - f"endpoint_id='{self.endpoint_id}', service_id='{self.service_id}', " - f"slice_id='{self.slice_id}', connection_id='{self.connection_id}', " - f"link_id='{self.link_id}')>") +Base = registry().generate_base() +# Base = declarative_base() class Collector(Base): __tablename__ = 'collector' - collector_id = Column(UUID(as_uuid=False), primary_key=True) - kpi_id = Column(UUID(as_uuid=False), ForeignKey('kpi.kpi_id')) - collector = Column(String) - sampling_duration_s = Column(Float) - sampling_interval_s = Column(Float) - start_timestamp = Column(Float) - end_timestamp = Column(Float) + collector_id = Column(UUID(as_uuid=False), primary_key=True) + kpi_id = Column(UUID(as_uuid=False)) + collector_decription = Column(String) + sampling_duration_s = Column(Float) + sampling_interval_s = Column(Float) + start_timestamp = Column(Float) + end_timestamp = Column(Float) - # Relationship to Kpi model: allows access to the related Kpi object from a Collector object - kpi = relationship('Kpi', back_populates='collectors') def __repr__(self): return (f"<Collector(collector_id='{self.collector_id}', kpi_id='{self.kpi_id}', " - f"collector='{self.collector}', sampling_duration_s='{self.sampling_duration_s}', " + f"collector='{self.collector_decription}', sampling_duration_s='{self.sampling_duration_s}', " f"sampling_interval_s='{self.sampling_interval_s}', start_timestamp='{self.start_timestamp}', " f"end_timestamp='{self.end_timestamp}')>") \ No newline at end of file diff --git a/src/telemetry/database/managementDB.py b/src/telemetry/database/managementDB.py index 0a94c6c25..3e0cfc5fb 100644 --- a/src/telemetry/database/managementDB.py +++ b/src/telemetry/database/managementDB.py @@ -13,16 +13,18 @@ # limitations under the License. import logging, time +import sqlalchemy +import sqlalchemy_utils from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base from telemetry.database.TelemetryEngine import TelemetryEngine - +from telemetry.database.TelemetryModel import Base LOGGER = logging.getLogger(__name__) -TELEMETRY_DB_NAME = "telemetryfrontend" +DB_NAME = "telemetryfrontend" -# Create a base class for declarative models -Base = declarative_base() +# # Create a base class for declarative models +# Base = declarative_base() class managementDB: def __init__(self): @@ -30,23 +32,35 @@ class managementDB: if self.db_engine is None: LOGGER.error('Unable to get SQLAlchemy DB Engine...') return False - self.db_name = TELEMETRY_DB_NAME + self.db_name = DB_NAME self.Session = sessionmaker(bind=self.db_engine) - def create_database(self): - try: - with self.db_engine.connect() as connection: - connection.execute(f"CREATE DATABASE {self.db_name};") - LOGGER.info('managementDB initalizes database. Name: {self.db_name}') - return True - except: - LOGGER.exception('Failed to check/create the database: {:s}'.format(str(self.db_engine.url))) - return False + @staticmethod + def create_database(engine : sqlalchemy.engine.Engine) -> None: + if not sqlalchemy_utils.database_exists(engine.url): + LOGGER.info("Database created. {:}".format(engine.url)) + sqlalchemy_utils.create_database(engine.url) + + @staticmethod + def drop_database(engine : sqlalchemy.engine.Engine) -> None: + if sqlalchemy_utils.database_exists(engine.url): + sqlalchemy_utils.drop_database(engine.url) + + # def create_database(self): + # try: + # with self.db_engine.connect() as connection: + # connection.execute(f"CREATE DATABASE {self.db_name};") + # LOGGER.info('managementDB initalizes database. Name: {self.db_name}') + # return True + # except: + # LOGGER.exception('Failed to check/create the database: {:s}'.format(str(self.db_engine.url))) + # return False - def create_tables(self): + @staticmethod + def create_tables(engine : sqlalchemy.engine.Engine): try: - Base.metadata.create_all(self.db_engine) # type: ignore - LOGGER.info("Tables created in the DB Name: {:}".format(self.db_name)) + Base.metadata.create_all(engine) # type: ignore + LOGGER.info("Tables created in the DB Name: {:}".format(DB_NAME)) except Exception as e: LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e))) @@ -59,6 +73,7 @@ class managementDB: except Exception as e: LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) + @staticmethod def add_row_to_db(self, row): session = self.Session() try: @@ -103,7 +118,7 @@ class managementDB: LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e) finally: session.close() - + def select_with_filter(self, model, **filters): session = self.Session() try: diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index d10e9dffd..c63b42cbf 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -55,12 +55,12 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): collector_to_insert = CollectorModel() collector_to_insert.collector_id = request.collector_id.collector_id.uuid collector_to_insert.kpi_id = request.kpi_id.kpi_id.uuid - collector_to_insert.collector = "DESC 1" + # collector_to_insert.collector_decription= request.collector collector_to_insert.sampling_duration_s = request.duration_s collector_to_insert.sampling_interval_s = request.interval_s collector_to_insert.start_timestamp = time.time() collector_to_insert.end_timestamp = time.time() - self.managementDBobj.add_row_to_db(collector_to_insert) + managementDB.add_row_to_db(collector_to_insert) except Exception as e: LOGGER.info("Unable to create collectorModel class object. {:}".format(e)) diff --git a/src/telemetry/frontend/tests/Messages.py b/src/telemetry/frontend/tests/Messages.py index 48668f7bf..6dc1dffa9 100644 --- a/src/telemetry/frontend/tests/Messages.py +++ b/src/telemetry/frontend/tests/Messages.py @@ -17,6 +17,8 @@ import random from common.proto import telemetry_frontend_pb2 from common.proto.kpi_sample_types_pb2 import KpiSampleType + +# ----------------------- "2nd" Iteration -------------------------------- def create_collector_id(): _collector_id = telemetry_frontend_pb2.CollectorId() _collector_id.collector_id.uuid = uuid.uuid4() @@ -31,16 +33,18 @@ def create_collector_request(): _create_collector_request = telemetry_frontend_pb2.Collector() _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) _create_collector_request.kpi_id.kpi_id.uuid = "165d20c5-a446-42fa-812f-e2b7ed283c6f" + # _create_collector_request.collector = "collector description" _create_collector_request.duration_s = float(random.randint(8, 16)) _create_collector_request.interval_s = float(random.randint(2, 4)) return _create_collector_request def create_collector_filter(): _create_collector_filter = telemetry_frontend_pb2.CollectorFilter() - new_kpi_id = _create_collector_filter.kpi_id.add() - new_kpi_id.kpi_id.uuid = "165d20c5-a446-42fa-812f-e2b7ed283c6f" + new_kpi_id = _create_collector_filter.kpi_id.add() + new_kpi_id.kpi_id.uuid = "165d20c5-a446-42fa-812f-e2b7ed283c6f" return _create_collector_filter +# ----------------------- "First" Iteration -------------------------------- # def create_collector_request_a(): # _create_collector_request_a = telemetry_frontend_pb2.Collector() # _create_collector_request_a.collector_id.collector_id.uuid = "-1" diff --git a/src/telemetry/frontend/tests/test_frontend.py b/src/telemetry/frontend/tests/test_frontend.py index 7d050349b..e33545dcc 100644 --- a/src/telemetry/frontend/tests/test_frontend.py +++ b/src/telemetry/frontend/tests/test_frontend.py @@ -32,6 +32,8 @@ from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendC from telemetry.frontend.service.TelemetryFrontendService import TelemetryFrontendService from telemetry.frontend.service.TelemetryFrontendServiceServicerImpl import TelemetryFrontendServiceServicerImpl from telemetry.frontend.tests.Messages import ( create_collector_request, create_collector_filter) +from telemetry.database.managementDB import managementDB +from telemetry.database.TelemetryEngine import TelemetryEngine from device.client.DeviceClient import DeviceClient from device.service.DeviceService import DeviceService @@ -166,6 +168,12 @@ def telemetryFrontend_client( # Tests Implementation of Telemetry Frontend ########################### +def test_verify_db_and_table(): + LOGGER.info(' >>> test_verify_database_and_tables START: <<< ') + _engine = TelemetryEngine.get_engine() + managementDB.create_database(_engine) + managementDB.create_tables(_engine) + def test_StartCollector(telemetryFrontend_client): LOGGER.info(' >>> test_StartCollector START: <<< ') response = telemetryFrontend_client.StartCollector(create_collector_request()) -- GitLab