Skip to content
Snippets Groups Projects
Commit d0ee5bea authored by Waleed Akbar's avatar Waleed Akbar
Browse files

CRDB working example of all operations

parent f2c91300
Branches
Tags
2 merge requests!294Release TeraFlowSDN 4.0,!207Resolve "(CTTC) Separation of Monitoring"
from sqlalchemy import create_engine, Column, String, Integer, Text, Float, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.dialects.postgresql import UUID
import logging
LOGGER = logging.getLogger(__name__)
Base = declarative_base()
class Kpi(Base):
__tablename__ = 'kpi'
kpi_id = Column(UUID(as_uuid=False), primary_key=True)
kpi_description = Column(Text)
kpi_sample_type = Column(Integer)
device_id = Column(String)
endpoint_id = Column(String)
service_id = Column(String)
slice_id = Column(String)
connection_id = Column(String)
link_id = Column(String)
collectors = relationship('Collector', back_populates='kpi')
def __repr__(self):
return (f"<Kpi(kpi_id='{self.kpi_id}', kpi_description='{self.kpi_description}', "
f"kpi_sample_type='{self.kpi_sample_type}', device_id='{self.device_id}', "
f"endpoint_id='{self.endpoint_id}', service_id='{self.service_id}', "
f"slice_id='{self.slice_id}', connection_id='{self.connection_id}', "
f"link_id='{self.link_id}')>")
class Collector(Base):
__tablename__ = 'collector'
collector_id = Column(UUID(as_uuid=False), primary_key=True)
kpi_id = Column(UUID(as_uuid=False), ForeignKey('kpi.kpi_id'))
collector = Column(String)
sampling_duration_s = Column(Float)
sampling_interval_s = Column(Float)
start_timestamp = Column(Float)
end_timestamp = Column(Float)
kpi = relationship('Kpi', back_populates='collectors')
def __repr__(self):
return (f"<Collector(collector_id='{self.collector_id}', kpi_id='{self.kpi_id}', "
f"collector='{self.collector}', sampling_duration_s='{self.sampling_duration_s}', "
f"sampling_interval_s='{self.sampling_interval_s}', start_timestamp='{self.start_timestamp}', "
f"end_timestamp='{self.end_timestamp}')>")
class DatabaseManager:
def __init__(self, db_url, db_name):
self.engine = create_engine(db_url)
self.db_name = db_name
self.Session = sessionmaker(bind=self.engine)
LOGGER.info("DatabaseManager initialized with DB URL: %s and DB Name: %s", db_url, db_name)
def create_database(self):
try:
with self.engine.connect() as connection:
connection.execute(f"CREATE DATABASE {self.db_name};")
LOGGER.info("Database '%s' created successfully.", self.db_name)
except Exception as e:
LOGGER.error("Error creating database '%s': %s", self.db_name, e)
finally:
LOGGER.info("create_database method execution finished.")
def create_tables(self):
try:
Base.metadata.create_all(self.engine)
LOGGER.info("Tables created successfully.")
except Exception as e:
LOGGER.error("Error creating tables: %s", e)
finally:
LOGGER.info("create_tables method execution finished.")
def verify_table_creation(self):
try:
with self.engine.connect() as connection:
result = connection.execute("SHOW TABLES;")
tables = result.fetchall()
LOGGER.info("Tables verified: %s", tables)
return tables
except Exception as e:
LOGGER.error("Error verifying table creation: %s", e)
return []
finally:
LOGGER.info("verify_table_creation method execution finished.")
def insert_row_kpi(self, kpi_data):
session = self.Session()
try:
new_kpi = Kpi(**kpi_data)
session.add(new_kpi)
session.commit()
LOGGER.info("Inserted row into KPI table: %s", kpi_data)
except Exception as e:
session.rollback()
LOGGER.error("Error inserting row into KPI table: %s", e)
finally:
session.close()
LOGGER.info("insert_row_kpi method execution finished.")
def insert_row_collector(self, collector_data):
session = self.Session()
try:
new_collector = Collector(**collector_data)
session.add(new_collector)
session.commit()
LOGGER.info("Inserted row into Collector table: %s", collector_data)
except Exception as e:
session.rollback()
LOGGER.error("Error inserting row into Collector table: %s", e)
finally:
session.close()
LOGGER.info("insert_row_collector method execution finished.")
def verify_insertion_kpi(self, kpi_id):
session = self.Session()
try:
kpi = session.query(Kpi).filter_by(kpi_id=kpi_id).first()
LOGGER.info("Verified insertion in KPI table for kpi_id: %s, Result: %s", kpi_id, kpi)
return kpi
except Exception as e:
LOGGER.error("Error verifying insertion in KPI table for kpi_id %s: %s", kpi_id, e)
return None
finally:
session.close()
LOGGER.info("verify_insertion_kpi method execution finished.")
def verify_insertion_collector(self, collector_id):
session = self.Session()
try:
collector = session.query(Collector).filter_by(collector_id=collector_id).first()
LOGGER.info("Verified insertion in Collector table for collector_id: %s, Result: %s", collector_id, collector)
return collector
except Exception as e:
LOGGER.error("Error verifying insertion in Collector table for collector_id %s: %s", collector_id, e)
return None
finally:
session.close()
LOGGER.info("verify_insertion_collector method execution finished.")
def get_all_kpi_rows(self):
session = self.Session()
try:
kpi_rows = session.query(Kpi).all()
LOGGER.info("Fetched all rows from KPI table: %s", kpi_rows)
return kpi_rows
except Exception as e:
LOGGER.error("Error fetching all rows from KPI table: %s", e)
return []
finally:
session.close()
LOGGER.info("get_all_kpi_rows method execution finished.")
def get_all_collector_rows(self):
session = self.Session()
try:
collector_rows = session.query(Collector).all()
LOGGER.info("Fetched all rows from Collector table: %s", collector_rows)
return collector_rows
except Exception as e:
LOGGER.error("Error fetching all rows from Collector table: %s", e)
return []
finally:
session.close()
LOGGER.info("get_all_collector_rows method execution finished.")
def get_filtered_kpi_rows(self, **filters):
session = self.Session()
try:
query = session.query(Kpi)
for column, value in filters.items():
query = query.filter(getattr(Kpi, column) == value)
result = query.all()
LOGGER.info("Fetched filtered rows from KPI table with filters ---------- : {:s}".format(str(result)))
return result
except NoResultFound:
LOGGER.warning("No results found in KPI table with filters %s", filters)
return []
except Exception as e:
LOGGER.error("Error fetching filtered rows from KPI table with filters %s: %s", filters, e)
return []
finally:
session.close()
LOGGER.info("get_filtered_kpi_rows method execution finished.")
def get_filtered_collector_rows(self, **filters):
session = self.Session()
try:
query = session.query(Collector)
for column, value in filters.items():
query = query.filter(getattr(Collector, column) == value)
result = query.all()
LOGGER.info("Fetched filtered rows from Collector table with filters %s: %s", filters, result)
return result
except NoResultFound:
LOGGER.warning("No results found in Collector table with filters %s", filters)
return []
except Exception as e:
LOGGER.error("Error fetching filtered rows from Collector table with filters %s: %s", filters, e)
return []
finally:
session.close()
LOGGER.info("get_filtered_collector_rows method execution finished.")
# Example Usage
def main():
CRDB_SQL_PORT = "26257"
CRDB_DATABASE = "TelemetryFrontend"
CRDB_USERNAME = "tfs"
CRDB_PASSWORD = "tfs123"
CRDB_SSLMODE = "require"
CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@127.0.0.1:{:s}/{:s}?sslmode={:s}'
crdb_uri = CRDB_URI_TEMPLATE.format(
CRDB_USERNAME, CRDB_PASSWORD, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
# db_url = "cockroachdb://username:password@localhost:26257/"
# db_name = "yourdatabase"
db_manager = DatabaseManager(crdb_uri, CRDB_DATABASE)
# Create database
# db_manager.create_database()
# Update db_url to include the new database name
db_manager.engine = create_engine(f"{crdb_uri}")
db_manager.Session = sessionmaker(bind=db_manager.engine)
# Create tables
db_manager.create_tables()
# Verify table creation
tables = db_manager.verify_table_creation()
LOGGER.info('Tables in the database: {:s}'.format(str(tables)))
# Insert a row into the KPI table
kpi_data = {
'kpi_id': '123e4567-e89b-12d3-a456-426614174100',
'kpi_description': 'Sample KPI',
'kpi_sample_type': 1,
'device_id': 'device_1',
'endpoint_id': 'endpoint_1',
'service_id': 'service_1',
'slice_id': 'slice_1',
'connection_id': 'conn_1',
'link_id': 'link_1'
}
db_manager.insert_row_kpi(kpi_data)
# Insert a row into the Collector table
collector_data = {
'collector_id': '123e4567-e89b-12d3-a456-426614174101',
'kpi_id': '123e4567-e89b-12d3-a456-426614174000',
'collector': 'Collector 1',
'sampling_duration_s': 60.0,
'sampling_interval_s': 10.0,
'start_timestamp': 1625247600.0,
'end_timestamp': 1625247660.0
}
db_manager.insert_row_collector(collector_data)
# Verify insertion into KPI table
kpi = db_manager.verify_insertion_kpi('123e4567-e89b-12d3-a456-426614174000')
print("Inserted KPI:", kpi)
# Verify insertion into Collector table
collector = db_manager.verify_insertion_collector('123e4567-e89b-12d3-a456-426614174001')
print("Inserted Collector:", collector)
# Get all rows from KPI table
all_kpi_rows = db_manager.get_all_kpi_rows()
LOGGER.info("All KPI Rows: %s", all_kpi_rows)
# Get all rows from Collector table
all_collector_rows = db_manager.get_all_collector_rows()
LOGGER.info("All Collector Rows: %s", all_collector_rows)
# Get filtered rows from KPI table
filtered_kpi_rows = db_manager.get_filtered_kpi_rows(kpi_description='Sample KPI')
LOGGER.info("Filtered KPI Rows: %s", filtered_kpi_rows)
# Get filtered rows from Collector table
filtered_collector_rows = db_manager.get_filtered_collector_rows(collector='Collector 1')
LOGGER.info("Filtered Collector Rows: %s", filtered_collector_rows)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment