from sqlalchemy import create_engine, Column, String, Integer, Text, Float, ForeignKey from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy.dialects.postgresql import UUID import logging LOGGER = logging.getLogger(__name__) Base = declarative_base() class Kpi(Base): __tablename__ = 'kpi' kpi_id = Column(UUID(as_uuid=False), primary_key=True) kpi_description = Column(Text) kpi_sample_type = Column(Integer) device_id = Column(String) endpoint_id = Column(String) service_id = Column(String) slice_id = Column(String) connection_id = Column(String) link_id = Column(String) collectors = relationship('Collector', back_populates='kpi') def __repr__(self): return (f"") class Collector(Base): __tablename__ = 'collector' collector_id = Column(UUID(as_uuid=False), primary_key=True) kpi_id = Column(UUID(as_uuid=False), ForeignKey('kpi.kpi_id')) collector = Column(String) sampling_duration_s = Column(Float) sampling_interval_s = Column(Float) start_timestamp = Column(Float) end_timestamp = Column(Float) kpi = relationship('Kpi', back_populates='collectors') def __repr__(self): return (f"") class DatabaseManager: def __init__(self, db_url, db_name): self.engine = create_engine(db_url) self.db_name = db_name self.Session = sessionmaker(bind=self.engine) LOGGER.info("DatabaseManager initialized with DB URL: %s and DB Name: %s", db_url, db_name) def create_database(self): try: with self.engine.connect() as connection: connection.execute(f"CREATE DATABASE {self.db_name};") LOGGER.info("Database '%s' created successfully.", self.db_name) except Exception as e: LOGGER.error("Error creating database '%s': %s", self.db_name, e) finally: LOGGER.info("create_database method execution finished.") def create_tables(self): try: Base.metadata.create_all(self.engine) LOGGER.info("Tables created successfully.") except Exception as e: LOGGER.error("Error creating tables: %s", e) finally: LOGGER.info("create_tables method execution finished.") def verify_table_creation(self): try: with self.engine.connect() as connection: result = connection.execute("SHOW TABLES;") tables = result.fetchall() LOGGER.info("Tables verified: %s", tables) return tables except Exception as e: LOGGER.error("Error verifying table creation: %s", e) return [] finally: LOGGER.info("verify_table_creation method execution finished.") def insert_row_kpi(self, kpi_data): session = self.Session() try: new_kpi = Kpi(**kpi_data) session.add(new_kpi) session.commit() LOGGER.info("Inserted row into KPI table: %s", kpi_data) except Exception as e: session.rollback() LOGGER.error("Error inserting row into KPI table: %s", e) finally: session.close() LOGGER.info("insert_row_kpi method execution finished.") def insert_row_collector(self, collector_data): session = self.Session() try: new_collector = Collector(**collector_data) session.add(new_collector) session.commit() LOGGER.info("Inserted row into Collector table: %s", collector_data) except Exception as e: session.rollback() LOGGER.error("Error inserting row into Collector table: %s", e) finally: session.close() LOGGER.info("insert_row_collector method execution finished.") def verify_insertion_kpi(self, kpi_id): session = self.Session() try: kpi = session.query(Kpi).filter_by(kpi_id=kpi_id).first() LOGGER.info("Verified insertion in KPI table for kpi_id: %s, Result: %s", kpi_id, kpi) return kpi except Exception as e: LOGGER.error("Error verifying insertion in KPI table for kpi_id %s: %s", kpi_id, e) return None finally: session.close() LOGGER.info("verify_insertion_kpi method execution finished.") def verify_insertion_collector(self, collector_id): session = self.Session() try: collector = session.query(Collector).filter_by(collector_id=collector_id).first() LOGGER.info("Verified insertion in Collector table for collector_id: %s, Result: %s", collector_id, collector) return collector except Exception as e: LOGGER.error("Error verifying insertion in Collector table for collector_id %s: %s", collector_id, e) return None finally: session.close() LOGGER.info("verify_insertion_collector method execution finished.") def get_all_kpi_rows(self): session = self.Session() try: kpi_rows = session.query(Kpi).all() LOGGER.info("Fetched all rows from KPI table: %s", kpi_rows) return kpi_rows except Exception as e: LOGGER.error("Error fetching all rows from KPI table: %s", e) return [] finally: session.close() LOGGER.info("get_all_kpi_rows method execution finished.") def get_all_collector_rows(self): session = self.Session() try: collector_rows = session.query(Collector).all() LOGGER.info("Fetched all rows from Collector table: %s", collector_rows) return collector_rows except Exception as e: LOGGER.error("Error fetching all rows from Collector table: %s", e) return [] finally: session.close() LOGGER.info("get_all_collector_rows method execution finished.") def get_filtered_kpi_rows(self, **filters): session = self.Session() try: query = session.query(Kpi) for column, value in filters.items(): query = query.filter(getattr(Kpi, column) == value) result = query.all() LOGGER.info("Fetched filtered rows from KPI table with filters ---------- : {:s}".format(str(result))) return result except NoResultFound: LOGGER.warning("No results found in KPI table with filters %s", filters) return [] except Exception as e: LOGGER.error("Error fetching filtered rows from KPI table with filters %s: %s", filters, e) return [] finally: session.close() LOGGER.info("get_filtered_kpi_rows method execution finished.") def get_filtered_collector_rows(self, **filters): session = self.Session() try: query = session.query(Collector) for column, value in filters.items(): query = query.filter(getattr(Collector, column) == value) result = query.all() LOGGER.info("Fetched filtered rows from Collector table with filters %s: %s", filters, result) return result except NoResultFound: LOGGER.warning("No results found in Collector table with filters %s", filters) return [] except Exception as e: LOGGER.error("Error fetching filtered rows from Collector table with filters %s: %s", filters, e) return [] finally: session.close() LOGGER.info("get_filtered_collector_rows method execution finished.") def delete_kpi_by_id(self, kpi_id): session = self.Session() try: kpi = session.query(Kpi).filter_by(kpi_id=kpi_id).first() if kpi: session.delete(kpi) session.commit() LOGGER.info("Deleted KPI with kpi_id: %s", kpi_id) else: LOGGER.warning("KPI with kpi_id %s not found", kpi_id) except SQLAlchemyError as e: session.rollback() LOGGER.error("Error deleting KPI with kpi_id %s: %s", kpi_id, e) finally: session.close() LOGGER.info("delete_kpi_by_id method execution finished.") def delete_collector_by_id(self, collector_id): session = self.Session() try: collector = session.query(Collector).filter_by(collector_id=collector_id).first() if collector: session.delete(collector) session.commit() LOGGER.info("Deleted Collector with collector_id: %s", collector_id) else: LOGGER.warning("Collector with collector_id %s not found", collector_id) except SQLAlchemyError as e: session.rollback() LOGGER.error("Error deleting Collector with collector_id %s: %s", collector_id, e) finally: session.close() LOGGER.info("delete_collector_by_id method execution finished.") # Example Usage def main(): CRDB_SQL_PORT = "26257" CRDB_DATABASE = "telemetryfrontend" CRDB_USERNAME = "tfs" CRDB_PASSWORD = "tfs123" CRDB_SSLMODE = "require" CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@127.0.0.1:{:s}/{:s}?sslmode={:s}' crdb_uri = CRDB_URI_TEMPLATE.format( CRDB_USERNAME, CRDB_PASSWORD, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) # db_url = "cockroachdb://username:password@localhost:26257/" # db_name = "yourdatabase" db_manager = DatabaseManager(crdb_uri, CRDB_DATABASE) # Create database db_manager.create_database() # Update db_url to include the new database name db_manager.engine = create_engine(f"{crdb_uri}") db_manager.Session = sessionmaker(bind=db_manager.engine) # Create tables db_manager.create_tables() # Verify table creation tables = db_manager.verify_table_creation() LOGGER.info('Tables in the database: {:s}'.format(str(tables))) # Insert a row into the KPI table kpi_data = { 'kpi_id': '123e4567-e89b-12d3-a456-426614174100', 'kpi_description': 'Sample KPI', 'kpi_sample_type': 1, 'device_id': 'device_1', 'endpoint_id': 'endpoint_1', 'service_id': 'service_1', 'slice_id': 'slice_1', 'connection_id': 'conn_1', 'link_id': 'link_1' } db_manager.insert_row_kpi(kpi_data) # Insert a row into the Collector table collector_data = { 'collector_id': '123e4567-e89b-12d3-a456-426614174101', 'kpi_id': '123e4567-e89b-12d3-a456-426614174000', 'collector': 'Collector 1', 'sampling_duration_s': 60.0, 'sampling_interval_s': 10.0, 'start_timestamp': 1625247600.0, 'end_timestamp': 1625247660.0 } db_manager.insert_row_collector(collector_data) # Verify insertion into KPI table kpi = db_manager.verify_insertion_kpi('123e4567-e89b-12d3-a456-426614174000') print("Inserted KPI:", kpi) # Verify insertion into Collector table collector = db_manager.verify_insertion_collector('123e4567-e89b-12d3-a456-426614174001') print("Inserted Collector:", collector) # Get all rows from KPI table all_kpi_rows = db_manager.get_all_kpi_rows() LOGGER.info("All KPI Rows: %s", all_kpi_rows) # Get all rows from Collector table all_collector_rows = db_manager.get_all_collector_rows() LOGGER.info("All Collector Rows: %s", all_collector_rows) # Get filtered rows from KPI table filtered_kpi_rows = db_manager.get_filtered_kpi_rows(kpi_description='Sample KPI') LOGGER.info("Filtered KPI Rows: %s", filtered_kpi_rows) # Get filtered rows from Collector table filtered_collector_rows = db_manager.get_filtered_collector_rows(collector='Collector 1') LOGGER.info("Filtered Collector Rows: %s", filtered_collector_rows) # Delete a KPI by kpi_id kpi_id_to_delete = '123e4567-e89b-12d3-a456-426614174000' db_manager.delete_kpi_by_id(kpi_id_to_delete) # Delete a Collector by collector_id collector_id_to_delete = '123e4567-e89b-12d3-a456-426614174001' db_manager.delete_collector_by_id(collector_id_to_delete)