Loading src/telemetry/database/TelemetryDBmanager.py +80 −5 Original line number Diff line number Diff line Loading @@ -20,7 +20,7 @@ from telemetry.database.TelemetryModel import Kpi as KpiModel from sqlalchemy.ext.declarative import declarative_base from telemetry.database.TelemetryEngine import TelemetryEngine from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId from common.proto.telemetry_frontend_pb2 import Collector from common.proto.telemetry_frontend_pb2 import Collector, CollectorId from sqlalchemy.exc import SQLAlchemyError Loading Loading @@ -65,6 +65,8 @@ class TelemetryDBmanager: except Exception as e: LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) # ------------------ INSERT METHODs -------------------------------------- def inser_kpi(self, request: KpiDescriptor): session = self.Session() try: Loading Loading @@ -114,16 +116,18 @@ class TelemetryDBmanager: # Close the session session.close() def get_kpi_descriptor(self, kpi_id: KpiId): # ------------------ GET METHODs -------------------------------------- def get_kpi_descriptor(self, request: KpiId): session = self.Session() try: kpi_id_to_get = kpi_id.kpi_id.uuid kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id_to_get).first() kpi_id_to_search = request.kpi_id.uuid kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id_to_search).first() if kpi: LOGGER.info("kpi ID found: {:s}".format(str(kpi))) return kpi else: LOGGER.info("Kpi ID not found{:s}".format(str(kpi_id_to_get))) LOGGER.warning("Kpi ID not found{:s}".format(str(kpi_id_to_search))) return None except Exception as e: session.rollback() Loading @@ -132,6 +136,26 @@ class TelemetryDBmanager: finally: session.close() def get_collector(self, request: CollectorId): session = self.Session() try: collector_id_to_search = request.collector_id.uuid collector = session.query(CollectorModel).filter_by(collector_id=collector_id_to_search).first() if collector: LOGGER.info("collector ID found: {:s}".format(str(collector))) return collector else: LOGGER.warning("collector ID not found{:s}".format(str(collector_id_to_search))) return None except Exception as e: session.rollback() LOGGER.info("Failed to retrieve collector ID. {:s}".format(str(e))) raise finally: session.close() # ------------------ SELECT METHODs -------------------------------------- def select_kpi_descriptor(self, **filters): session = self.Session() try: Loading @@ -146,3 +170,54 @@ class TelemetryDBmanager: return [] finally: session.close() def select_collector(self, **filters): session = self.Session() try: query = session.query(CollectorModel) for column, value in filters.items(): query = query.filter(getattr(CollectorModel, column) == value) result = query.all() LOGGER.info("Fetched filtered rows from KPI table with filters ---------- : {:s}".format(str(result))) return result except SQLAlchemyError as e: LOGGER.error("Error fetching filtered rows from KPI table with filters {:}: {:}".format(filters, e)) return [] finally: session.close() # ------------------ DELETE METHODs -------------------------------------- def delete_kpi_descriptor(self, request: KpiId): session = self.Session() try: kpi_id_to_delete = request.kpi_id.uuid kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id_to_delete).first() if kpi: session.delete(kpi) session.commit() LOGGER.info("Deleted KPI with kpi_id: %s", kpi_id_to_delete) else: LOGGER.warning("KPI with kpi_id %s not found", kpi_id_to_delete) except SQLAlchemyError as e: session.rollback() LOGGER.error("Error deleting KPI with kpi_id %s: %s", kpi_id_to_delete, e) finally: session.close() def delete_collector(self, request: CollectorId): session = self.Session() try: collector_id_to_delete = request.collector_id.uuid collector = session.query(CollectorModel).filter_by(collector_id=collector_id_to_delete).first() if collector: session.delete(collector) session.commit() LOGGER.info("Deleted KPI with kpi_id: %s", collector_id_to_delete) else: LOGGER.warning("KPI with kpi_id %s not found", collector_id_to_delete) except SQLAlchemyError as e: session.rollback() LOGGER.error("Error deleting KPI with kpi_id %s: %s", collector_id_to_delete, e) finally: session.close() No newline at end of file src/telemetry/database/tests/messages.py +12 −0 Original line number Diff line number Diff line Loading @@ -46,9 +46,21 @@ def create_kpi_id_request(): _create_kpi_id_request.kpi_id.uuid = '11e2c6c6-b507-40aa-ab3a-ffd41e7125f0' return _create_kpi_id_request def create_collector_id_request(): _create_collector_id_request = telemetry_frontend_pb2.CollectorId() _create_collector_id_request.collector_id.uuid = '50ba9199-7e9d-45b5-a2fc-3f97917bad65' return _create_collector_id_request def create_kpi_filter_request(): # create a dict as follows: 'Key' = 'KpiModel' column name and 'Value' = filter to apply. _create_kpi_filter_request = dict() _create_kpi_filter_request['kpi_sample_type'] = 102 _create_kpi_filter_request['kpi_id'] = '11e2c6c6-b507-40aa-ab3a-ffd41e7125f0' return _create_kpi_filter_request def create_collector_filter_request(): # create a dict as follows: 'Key' = 'KpiModel' column name and 'Value' = filter to apply. _create_kpi_filter_request = dict() _create_kpi_filter_request['sampling_interval_s'] = 3.0 # _create_kpi_filter_request['kpi_id'] = '11e2c6c6-b507-40aa-ab3a-ffd41e7125f0' return _create_kpi_filter_request No newline at end of file src/telemetry/database/tests/telemetryDBtests.py +28 −4 Original line number Diff line number Diff line Loading @@ -19,7 +19,8 @@ from telemetry.database.TelemetryDBmanager import TelemetryDBmanager from telemetry.database.TelemetryEngine import TelemetryEngine from telemetry.database.tests import temp_DB from .messages import create_kpi_request, create_collector_request, \ create_kpi_id_request, create_kpi_filter_request create_kpi_id_request, create_kpi_filter_request, \ create_collector_id_request, create_collector_filter_request logging.basicConfig(level=logging.INFO) LOGGER = logging.getLogger(__name__) Loading @@ -29,22 +30,45 @@ LOGGER = logging.getLogger(__name__) def test_telemetry_object_creation(): LOGGER.info('--- test_telemetry_object_creation: START') LOGGER.info('>>> Creating TelemetryDBmanager Object: ') TelemetryDBmanagerObj = TelemetryDBmanager() # LOGGER.info('>>> Creating Tables: ') # TelemetryDBmanagerObj.create_tables() # LOGGER.info('>>> Verifing Table creation: ') # TelemetryDBmanagerObj.verify_tables() LOGGER.info('>>> Row Insertion Operation: kpi Table') LOGGER.info('>>> TESTING: Row Insertion Operation: kpi Table') kpi_obj = create_kpi_request() TelemetryDBmanagerObj.inser_kpi(kpi_obj) LOGGER.info('>>> Row Insertion Operation: collector Table') LOGGER.info('>>> TESTING: Row Insertion Operation: collector Table') collector_obj = create_collector_request() TelemetryDBmanagerObj.insert_collector(collector_obj) LOGGER.info('>>> Get KpiDescriptor ') LOGGER.info('>>> TESTING: Get KpiDescriptor ') kpi_id_obj = create_kpi_id_request() TelemetryDBmanagerObj.get_kpi_descriptor(kpi_id_obj) LOGGER.info('>>> TESTING: Select Collector ') collector_id_obj = create_collector_id_request() TelemetryDBmanagerObj.get_collector(collector_id_obj) LOGGER.info('>>> TESTING: Applying kpi filter ') kpi_filter : dict[str, Any] = create_kpi_filter_request() TelemetryDBmanagerObj.select_kpi_descriptor(**kpi_filter) LOGGER.info('>>> TESTING: Applying collector filter ') collector_filter : dict[str, Any] = create_collector_filter_request() TelemetryDBmanagerObj.select_collector(**collector_filter) LOGGER.info('>>> TESTING: Delete KpiDescriptor ') kpi_id_obj = create_kpi_id_request() TelemetryDBmanagerObj.delete_kpi_descriptor(kpi_id_obj) LOGGER.info('>>> TESTING: Delete Collector ') collector_id_obj = create_collector_id_request() TelemetryDBmanagerObj.delete_collector(collector_id_obj) No newline at end of file src/telemetry/database/tests/temp_DB.py +45 −2 Original line number Diff line number Diff line Loading @@ -205,6 +205,41 @@ class DatabaseManager: session.close() LOGGER.info("get_filtered_collector_rows method execution finished.") def delete_kpi_by_id(self, kpi_id): session = self.Session() try: kpi = session.query(Kpi).filter_by(kpi_id=kpi_id).first() if kpi: session.delete(kpi) session.commit() LOGGER.info("Deleted KPI with kpi_id: %s", kpi_id) else: LOGGER.warning("KPI with kpi_id %s not found", kpi_id) except SQLAlchemyError as e: session.rollback() LOGGER.error("Error deleting KPI with kpi_id %s: %s", kpi_id, e) finally: session.close() LOGGER.info("delete_kpi_by_id method execution finished.") def delete_collector_by_id(self, collector_id): session = self.Session() try: collector = session.query(Collector).filter_by(collector_id=collector_id).first() if collector: session.delete(collector) session.commit() LOGGER.info("Deleted Collector with collector_id: %s", collector_id) else: LOGGER.warning("Collector with collector_id %s not found", collector_id) except SQLAlchemyError as e: session.rollback() LOGGER.error("Error deleting Collector with collector_id %s: %s", collector_id, e) finally: session.close() LOGGER.info("delete_collector_by_id method execution finished.") # Example Usage def main(): CRDB_SQL_PORT = "26257" Loading Loading @@ -282,3 +317,11 @@ def main(): # Get filtered rows from Collector table filtered_collector_rows = db_manager.get_filtered_collector_rows(collector='Collector 1') LOGGER.info("Filtered Collector Rows: %s", filtered_collector_rows) # Delete a KPI by kpi_id kpi_id_to_delete = '123e4567-e89b-12d3-a456-426614174000' db_manager.delete_kpi_by_id(kpi_id_to_delete) # Delete a Collector by collector_id collector_id_to_delete = '123e4567-e89b-12d3-a456-426614174001' db_manager.delete_collector_by_id(collector_id_to_delete) Loading
src/telemetry/database/TelemetryDBmanager.py +80 −5 Original line number Diff line number Diff line Loading @@ -20,7 +20,7 @@ from telemetry.database.TelemetryModel import Kpi as KpiModel from sqlalchemy.ext.declarative import declarative_base from telemetry.database.TelemetryEngine import TelemetryEngine from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId from common.proto.telemetry_frontend_pb2 import Collector from common.proto.telemetry_frontend_pb2 import Collector, CollectorId from sqlalchemy.exc import SQLAlchemyError Loading Loading @@ -65,6 +65,8 @@ class TelemetryDBmanager: except Exception as e: LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) # ------------------ INSERT METHODs -------------------------------------- def inser_kpi(self, request: KpiDescriptor): session = self.Session() try: Loading Loading @@ -114,16 +116,18 @@ class TelemetryDBmanager: # Close the session session.close() def get_kpi_descriptor(self, kpi_id: KpiId): # ------------------ GET METHODs -------------------------------------- def get_kpi_descriptor(self, request: KpiId): session = self.Session() try: kpi_id_to_get = kpi_id.kpi_id.uuid kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id_to_get).first() kpi_id_to_search = request.kpi_id.uuid kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id_to_search).first() if kpi: LOGGER.info("kpi ID found: {:s}".format(str(kpi))) return kpi else: LOGGER.info("Kpi ID not found{:s}".format(str(kpi_id_to_get))) LOGGER.warning("Kpi ID not found{:s}".format(str(kpi_id_to_search))) return None except Exception as e: session.rollback() Loading @@ -132,6 +136,26 @@ class TelemetryDBmanager: finally: session.close() def get_collector(self, request: CollectorId): session = self.Session() try: collector_id_to_search = request.collector_id.uuid collector = session.query(CollectorModel).filter_by(collector_id=collector_id_to_search).first() if collector: LOGGER.info("collector ID found: {:s}".format(str(collector))) return collector else: LOGGER.warning("collector ID not found{:s}".format(str(collector_id_to_search))) return None except Exception as e: session.rollback() LOGGER.info("Failed to retrieve collector ID. {:s}".format(str(e))) raise finally: session.close() # ------------------ SELECT METHODs -------------------------------------- def select_kpi_descriptor(self, **filters): session = self.Session() try: Loading @@ -146,3 +170,54 @@ class TelemetryDBmanager: return [] finally: session.close() def select_collector(self, **filters): session = self.Session() try: query = session.query(CollectorModel) for column, value in filters.items(): query = query.filter(getattr(CollectorModel, column) == value) result = query.all() LOGGER.info("Fetched filtered rows from KPI table with filters ---------- : {:s}".format(str(result))) return result except SQLAlchemyError as e: LOGGER.error("Error fetching filtered rows from KPI table with filters {:}: {:}".format(filters, e)) return [] finally: session.close() # ------------------ DELETE METHODs -------------------------------------- def delete_kpi_descriptor(self, request: KpiId): session = self.Session() try: kpi_id_to_delete = request.kpi_id.uuid kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id_to_delete).first() if kpi: session.delete(kpi) session.commit() LOGGER.info("Deleted KPI with kpi_id: %s", kpi_id_to_delete) else: LOGGER.warning("KPI with kpi_id %s not found", kpi_id_to_delete) except SQLAlchemyError as e: session.rollback() LOGGER.error("Error deleting KPI with kpi_id %s: %s", kpi_id_to_delete, e) finally: session.close() def delete_collector(self, request: CollectorId): session = self.Session() try: collector_id_to_delete = request.collector_id.uuid collector = session.query(CollectorModel).filter_by(collector_id=collector_id_to_delete).first() if collector: session.delete(collector) session.commit() LOGGER.info("Deleted KPI with kpi_id: %s", collector_id_to_delete) else: LOGGER.warning("KPI with kpi_id %s not found", collector_id_to_delete) except SQLAlchemyError as e: session.rollback() LOGGER.error("Error deleting KPI with kpi_id %s: %s", collector_id_to_delete, e) finally: session.close() No newline at end of file
src/telemetry/database/tests/messages.py +12 −0 Original line number Diff line number Diff line Loading @@ -46,9 +46,21 @@ def create_kpi_id_request(): _create_kpi_id_request.kpi_id.uuid = '11e2c6c6-b507-40aa-ab3a-ffd41e7125f0' return _create_kpi_id_request def create_collector_id_request(): _create_collector_id_request = telemetry_frontend_pb2.CollectorId() _create_collector_id_request.collector_id.uuid = '50ba9199-7e9d-45b5-a2fc-3f97917bad65' return _create_collector_id_request def create_kpi_filter_request(): # create a dict as follows: 'Key' = 'KpiModel' column name and 'Value' = filter to apply. _create_kpi_filter_request = dict() _create_kpi_filter_request['kpi_sample_type'] = 102 _create_kpi_filter_request['kpi_id'] = '11e2c6c6-b507-40aa-ab3a-ffd41e7125f0' return _create_kpi_filter_request def create_collector_filter_request(): # create a dict as follows: 'Key' = 'KpiModel' column name and 'Value' = filter to apply. _create_kpi_filter_request = dict() _create_kpi_filter_request['sampling_interval_s'] = 3.0 # _create_kpi_filter_request['kpi_id'] = '11e2c6c6-b507-40aa-ab3a-ffd41e7125f0' return _create_kpi_filter_request No newline at end of file
src/telemetry/database/tests/telemetryDBtests.py +28 −4 Original line number Diff line number Diff line Loading @@ -19,7 +19,8 @@ from telemetry.database.TelemetryDBmanager import TelemetryDBmanager from telemetry.database.TelemetryEngine import TelemetryEngine from telemetry.database.tests import temp_DB from .messages import create_kpi_request, create_collector_request, \ create_kpi_id_request, create_kpi_filter_request create_kpi_id_request, create_kpi_filter_request, \ create_collector_id_request, create_collector_filter_request logging.basicConfig(level=logging.INFO) LOGGER = logging.getLogger(__name__) Loading @@ -29,22 +30,45 @@ LOGGER = logging.getLogger(__name__) def test_telemetry_object_creation(): LOGGER.info('--- test_telemetry_object_creation: START') LOGGER.info('>>> Creating TelemetryDBmanager Object: ') TelemetryDBmanagerObj = TelemetryDBmanager() # LOGGER.info('>>> Creating Tables: ') # TelemetryDBmanagerObj.create_tables() # LOGGER.info('>>> Verifing Table creation: ') # TelemetryDBmanagerObj.verify_tables() LOGGER.info('>>> Row Insertion Operation: kpi Table') LOGGER.info('>>> TESTING: Row Insertion Operation: kpi Table') kpi_obj = create_kpi_request() TelemetryDBmanagerObj.inser_kpi(kpi_obj) LOGGER.info('>>> Row Insertion Operation: collector Table') LOGGER.info('>>> TESTING: Row Insertion Operation: collector Table') collector_obj = create_collector_request() TelemetryDBmanagerObj.insert_collector(collector_obj) LOGGER.info('>>> Get KpiDescriptor ') LOGGER.info('>>> TESTING: Get KpiDescriptor ') kpi_id_obj = create_kpi_id_request() TelemetryDBmanagerObj.get_kpi_descriptor(kpi_id_obj) LOGGER.info('>>> TESTING: Select Collector ') collector_id_obj = create_collector_id_request() TelemetryDBmanagerObj.get_collector(collector_id_obj) LOGGER.info('>>> TESTING: Applying kpi filter ') kpi_filter : dict[str, Any] = create_kpi_filter_request() TelemetryDBmanagerObj.select_kpi_descriptor(**kpi_filter) LOGGER.info('>>> TESTING: Applying collector filter ') collector_filter : dict[str, Any] = create_collector_filter_request() TelemetryDBmanagerObj.select_collector(**collector_filter) LOGGER.info('>>> TESTING: Delete KpiDescriptor ') kpi_id_obj = create_kpi_id_request() TelemetryDBmanagerObj.delete_kpi_descriptor(kpi_id_obj) LOGGER.info('>>> TESTING: Delete Collector ') collector_id_obj = create_collector_id_request() TelemetryDBmanagerObj.delete_collector(collector_id_obj) No newline at end of file
src/telemetry/database/tests/temp_DB.py +45 −2 Original line number Diff line number Diff line Loading @@ -205,6 +205,41 @@ class DatabaseManager: session.close() LOGGER.info("get_filtered_collector_rows method execution finished.") def delete_kpi_by_id(self, kpi_id): session = self.Session() try: kpi = session.query(Kpi).filter_by(kpi_id=kpi_id).first() if kpi: session.delete(kpi) session.commit() LOGGER.info("Deleted KPI with kpi_id: %s", kpi_id) else: LOGGER.warning("KPI with kpi_id %s not found", kpi_id) except SQLAlchemyError as e: session.rollback() LOGGER.error("Error deleting KPI with kpi_id %s: %s", kpi_id, e) finally: session.close() LOGGER.info("delete_kpi_by_id method execution finished.") def delete_collector_by_id(self, collector_id): session = self.Session() try: collector = session.query(Collector).filter_by(collector_id=collector_id).first() if collector: session.delete(collector) session.commit() LOGGER.info("Deleted Collector with collector_id: %s", collector_id) else: LOGGER.warning("Collector with collector_id %s not found", collector_id) except SQLAlchemyError as e: session.rollback() LOGGER.error("Error deleting Collector with collector_id %s: %s", collector_id, e) finally: session.close() LOGGER.info("delete_collector_by_id method execution finished.") # Example Usage def main(): CRDB_SQL_PORT = "26257" Loading Loading @@ -282,3 +317,11 @@ def main(): # Get filtered rows from Collector table filtered_collector_rows = db_manager.get_filtered_collector_rows(collector='Collector 1') LOGGER.info("Filtered Collector Rows: %s", filtered_collector_rows) # Delete a KPI by kpi_id kpi_id_to_delete = '123e4567-e89b-12d3-a456-426614174000' db_manager.delete_kpi_by_id(kpi_id_to_delete) # Delete a Collector by collector_id collector_id_to_delete = '123e4567-e89b-12d3-a456-426614174001' db_manager.delete_collector_by_id(collector_id_to_delete)