Skip to content
Snippets Groups Projects
Commit 8072cdfb authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes are made to restructure the Telemetry DB operations.

- TelemetryEngine.py is updated.
- TelemetryModel.py is refined.
- Optimized DB operation were added in TelemetryDB.py
parent 485d3178
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!261(CTTC) New Analytics Component
......@@ -12,21 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, time
import sqlalchemy
import logging
import sqlalchemy_utils
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from telemetry.database.TelemetryModel import Collector as CollectorModel
from telemetry.database.TelemetryEngine import TelemetryEngine
from telemetry.database.TelemetryModel import Base
from common.method_wrappers.ServiceExceptions import (
OperationFailedException, AlreadyExistsException )
LOGGER = logging.getLogger(__name__)
DB_NAME = "telemetryfrontend"
DB_NAME = "tfs_telemetry"
# # Create a base class for declarative models
# Base = declarative_base()
class managementDB:
class TelemetryDBmanager:
def __init__(self):
self.db_engine = TelemetryEngine.get_engine()
if self.db_engine is None:
......@@ -35,54 +32,50 @@ class managementDB:
self.db_name = DB_NAME
self.Session = sessionmaker(bind=self.db_engine)
@staticmethod
def create_database(engine : sqlalchemy.engine.Engine) -> None:
if not sqlalchemy_utils.database_exists(engine.url):
LOGGER.info("Database created. {:}".format(engine.url))
sqlalchemy_utils.create_database(engine.url)
def create_database(self):
if not sqlalchemy_utils.database_exists(self.db_engine.url):
LOGGER.debug("Database created. {:}".format(self.db_engine.url))
sqlalchemy_utils.create_database(self.db_engine.url)
@staticmethod
def drop_database(engine : sqlalchemy.engine.Engine) -> None:
if sqlalchemy_utils.database_exists(engine.url):
sqlalchemy_utils.drop_database(engine.url)
def drop_database(self) -> None:
if sqlalchemy_utils.database_exists(self.db_engine.url):
sqlalchemy_utils.drop_database(self.db_engine.url)
# def create_database(self):
# try:
# with self.db_engine.connect() as connection:
# connection.execute(f"CREATE DATABASE {self.db_name};")
# LOGGER.info('managementDB initalizes database. Name: {self.db_name}')
# return True
# except:
# LOGGER.exception('Failed to check/create the database: {:s}'.format(str(self.db_engine.url)))
# return False
@staticmethod
def create_tables(engine : sqlalchemy.engine.Engine):
def create_tables(self):
try:
Base.metadata.create_all(engine) # type: ignore
LOGGER.info("Tables created in the DB Name: {:}".format(DB_NAME))
CollectorModel.metadata.create_all(self.db_engine) # type: ignore
LOGGER.debug("Tables created in the database: {:}".format(self.db_name))
except Exception as e:
LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e)))
LOGGER.debug("Tables cannot be created in the database. {:s}".format(str(e)))
raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)])
def verify_tables(self):
try:
with self.db_engine.connect() as connection:
result = connection.execute("SHOW TABLES;")
tables = result.fetchall() # type: ignore
LOGGER.info("Tables verified: {:}".format(tables))
tables = result.fetchall()
LOGGER.info("Tables in DB: {:}".format(tables))
except Exception as e:
LOGGER.info("Unable to fetch Table names. {:s}".format(str(e)))
@staticmethod
# ----------------- CURD METHODs ---------------------
def add_row_to_db(self, row):
session = self.Session()
try:
session.add(row)
session.commit()
LOGGER.info(f"Row inserted into {row.__class__.__name__} table.")
LOGGER.debug(f"Row inserted into {row.__class__.__name__} table.")
return True
except Exception as e:
session.rollback()
LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}")
if "psycopg2.errors.UniqueViolation" in str(e):
LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}")
raise AlreadyExistsException(row.__class__.__name__, row,
extra_details=["Unique key voilation: {:}".format(e)] )
else:
LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}")
raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)])
finally:
session.close()
......@@ -91,15 +84,16 @@ class managementDB:
try:
entity = session.query(model).filter_by(**{col_name: id_to_search}).first()
if entity:
LOGGER.info(f"{model.__name__} ID found: {str(entity)}")
# LOGGER.debug(f"{model.__name__} ID found: {str(entity)}")
return entity
else:
LOGGER.warning(f"{model.__name__} ID not found: {str(id_to_search)}")
LOGGER.debug(f"{model.__name__} ID not found, No matching row: {str(id_to_search)}")
print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search))
return None
except Exception as e:
session.rollback()
LOGGER.info(f"Failed to retrieve {model.__name__} ID. {str(e)}")
raise
LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}")
raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)])
finally:
session.close()
......@@ -110,29 +104,34 @@ class managementDB:
if record:
session.delete(record)
session.commit()
LOGGER.info("Deleted %s with %s: %s", model.__name__, col_name, id_to_search)
LOGGER.debug("Deleted %s with %s: %s", model.__name__, col_name, id_to_search)
else:
LOGGER.warning("%s with %s %s not found", model.__name__, col_name, id_to_search)
LOGGER.debug("%s with %s %s not found", model.__name__, col_name, id_to_search)
return None
except Exception as e:
session.rollback()
LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e)
raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)])
finally:
session.close()
def select_with_filter(self, model, **filters):
def select_with_filter(self, model, filter_object):
session = self.Session()
try:
query = session.query(model)
for column, value in filters.items():
query = query.filter(getattr(model, column) == value) # type: ignore
query = session.query(CollectorModel)
# Apply filters based on the filter_object
if filter_object.kpi_id:
query = query.filter(CollectorModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id]))
result = query.all()
if result:
LOGGER.info(f"Fetched filtered rows from {model.__name__} table with filters: {filters}") # - Results: {result}
LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result}
else:
LOGGER.warning(f"No matching row found in {model.__name__} table with filters: {filters}")
LOGGER.debug(f"No matching row found in {model.__name__} table with filters: {filter_object}")
return result
except Exception as e:
LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filters} ::: {e}")
return []
LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}")
raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)])
finally:
session.close()
\ No newline at end of file
session.close()
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, time
import sqlalchemy
from sqlalchemy import inspect, MetaData, Table
from sqlalchemy.orm import sessionmaker
from telemetry.database.TelemetryModel import Collector as CollectorModel
from telemetry.database.TelemetryModel import Kpi as KpiModel
from sqlalchemy.ext.declarative import declarative_base
from telemetry.database.TelemetryEngine import TelemetryEngine
from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
from sqlalchemy.exc import SQLAlchemyError
from telemetry.database.TelemetryModel import Base
LOGGER = logging.getLogger(__name__)
DB_NAME = "telemetryfrontend"
class TelemetryDBmanager:
def __init__(self):
self.db_engine = TelemetryEngine.get_engine()
if self.db_engine is None:
LOGGER.error('Unable to get SQLAlchemy DB Engine...')
return False
self.db_name = DB_NAME
self.Session = sessionmaker(bind=self.db_engine)
def create_database(self):
try:
# with self.db_engine.connect() as connection:
# connection.execute(f"CREATE DATABASE {self.db_name};")
TelemetryEngine.create_database(self.db_engine)
LOGGER.info('TelemetryDBmanager initalized DB Name: {:}'.format(self.db_name))
return True
except Exception as e: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to check/create the database: {:s}'.format(str(e)))
return False
def create_tables(self):
try:
Base.metadata.create_all(self.db_engine) # type: ignore
LOGGER.info("Tables created in database ({:}) the as per Models".format(self.db_name))
except Exception as e:
LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e)))
def verify_tables(self):
try:
with self.db_engine.connect() as connection:
result = connection.execute("SHOW TABLES;")
tables = result.fetchall()
LOGGER.info("Tables in DB: {:}".format(tables))
except Exception as e:
LOGGER.info("Unable to fetch Table names. {:s}".format(str(e)))
def drop_table(self, table_to_drop: str):
try:
inspector = inspect(self.db_engine)
existing_tables = inspector.get_table_names()
if table_to_drop in existing_tables:
table = Table(table_to_drop, MetaData(), autoload_with=self.db_engine)
table.drop(self.db_engine)
LOGGER.info("Tables delete in the DB Name: {:}".format(self.db_name))
else:
LOGGER.warning("No table {:} in database {:} ".format(table_to_drop, DB_NAME))
except Exception as e:
LOGGER.info("Tables cannot be deleted in the {:} database. {:s}".format(DB_NAME, str(e)))
def list_databases(self):
query = "SHOW DATABASES"
with self.db_engine.connect() as connection:
result = connection.execute(query)
databases = [row[0] for row in result]
LOGGER.info("List of available DBs: {:}".format(databases))
# ------------------ INSERT METHODs --------------------------------------
def inser_kpi(self, request: KpiDescriptor):
session = self.Session()
try:
# Create a new Kpi instance
kpi_to_insert = KpiModel()
kpi_to_insert.kpi_id = request.kpi_id.kpi_id.uuid
kpi_to_insert.kpi_description = request.kpi_description
kpi_to_insert.kpi_sample_type = request.kpi_sample_type
kpi_to_insert.device_id = request.service_id.service_uuid.uuid
kpi_to_insert.endpoint_id = request.device_id.device_uuid.uuid
kpi_to_insert.service_id = request.slice_id.slice_uuid.uuid
kpi_to_insert.slice_id = request.endpoint_id.endpoint_uuid.uuid
kpi_to_insert.connection_id = request.connection_id.connection_uuid.uuid
# kpi_to_insert.link_id = request.link_id.link_id.uuid
# Add the instance to the session
session.add(kpi_to_insert)
session.commit()
LOGGER.info("Row inserted into kpi table: {:}".format(kpi_to_insert.kpi_id))
except Exception as e:
session.rollback()
LOGGER.info("Failed to insert new kpi. {:s}".format(str(e)))
finally:
# Close the session
session.close()
# Function to insert a row into the Collector model
def insert_collector(self, request: Collector):
session = self.Session()
try:
# Create a new Collector instance
collector_to_insert = CollectorModel()
collector_to_insert.collector_id = request.collector_id.collector_id.uuid
collector_to_insert.kpi_id = request.kpi_id.kpi_id.uuid
collector_to_insert.collector = "Test collector description"
collector_to_insert.sampling_duration_s = request.duration_s
collector_to_insert.sampling_interval_s = request.interval_s
collector_to_insert.start_timestamp = time.time()
collector_to_insert.end_timestamp = time.time()
session.add(collector_to_insert)
session.commit()
LOGGER.info("Row inserted into collector table: {:}".format(collector_to_insert.collector_id))
except Exception as e:
session.rollback()
LOGGER.info("Failed to insert new collector. {:s}".format(str(e)))
finally:
# Close the session
session.close()
# ------------------ GET METHODs --------------------------------------
def get_kpi_descriptor(self, request: KpiId):
session = self.Session()
try:
kpi_id_to_search = request.kpi_id.uuid
kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id_to_search).first()
if kpi:
LOGGER.info("kpi ID found: {:s}".format(str(kpi)))
return kpi
else:
LOGGER.warning("Kpi ID not found {:s}".format(str(kpi_id_to_search)))
return None
except Exception as e:
session.rollback()
LOGGER.info("Failed to retrieve KPI ID. {:s}".format(str(e)))
raise
finally:
session.close()
def get_collector(self, request: CollectorId):
session = self.Session()
try:
collector_id_to_search = request.collector_id.uuid
collector = session.query(CollectorModel).filter_by(collector_id=collector_id_to_search).first()
if collector:
LOGGER.info("collector ID found: {:s}".format(str(collector)))
return collector
else:
LOGGER.warning("collector ID not found{:s}".format(str(collector_id_to_search)))
return None
except Exception as e:
session.rollback()
LOGGER.info("Failed to retrieve collector ID. {:s}".format(str(e)))
raise
finally:
session.close()
# ------------------ SELECT METHODs --------------------------------------
def select_kpi_descriptor(self, **filters):
session = self.Session()
try:
query = session.query(KpiModel)
for column, value in filters.items():
query = query.filter(getattr(KpiModel, column) == value)
result = query.all()
if len(result) != 0:
LOGGER.info("Fetched filtered rows from KPI table with filters : {:s}".format(str(result)))
else:
LOGGER.warning("No matching row found : {:s}".format(str(result)))
return result
except SQLAlchemyError as e:
LOGGER.error("Error fetching filtered rows from KPI table with filters {:}: {:}".format(filters, e))
return []
finally:
session.close()
def select_collector(self, **filters):
session = self.Session()
try:
query = session.query(CollectorModel)
for column, value in filters.items():
query = query.filter(getattr(CollectorModel, column) == value)
result = query.all()
if len(result) != 0:
LOGGER.info("Fetched filtered rows from KPI table with filters : {:s}".format(str(result)))
else:
LOGGER.warning("No matching row found : {:s}".format(str(result)))
return result
except SQLAlchemyError as e:
LOGGER.error("Error fetching filtered rows from KPI table with filters {:}: {:}".format(filters, e))
return []
finally:
session.close()
# ------------------ DELETE METHODs --------------------------------------
def delete_kpi_descriptor(self, request: KpiId):
session = self.Session()
try:
kpi_id_to_delete = request.kpi_id.uuid
kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id_to_delete).first()
if kpi:
session.delete(kpi)
session.commit()
LOGGER.info("Deleted KPI with kpi_id: %s", kpi_id_to_delete)
else:
LOGGER.warning("KPI with kpi_id %s not found", kpi_id_to_delete)
except SQLAlchemyError as e:
session.rollback()
LOGGER.error("Error deleting KPI with kpi_id %s: %s", kpi_id_to_delete, e)
finally:
session.close()
def delete_collector(self, request: CollectorId):
session = self.Session()
try:
collector_id_to_delete = request.collector_id.uuid
collector = session.query(CollectorModel).filter_by(collector_id=collector_id_to_delete).first()
if collector:
session.delete(collector)
session.commit()
LOGGER.info("Deleted collector with collector_id: %s", collector_id_to_delete)
else:
LOGGER.warning("collector with collector_id %s not found", collector_id_to_delete)
except SQLAlchemyError as e:
session.rollback()
LOGGER.error("Error deleting collector with collector_id %s: %s", collector_id_to_delete, e)
finally:
session.close()
\ No newline at end of file
......@@ -12,34 +12,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, sqlalchemy, sqlalchemy_utils
# from common.Settings import get_setting
import logging, sqlalchemy
from common.Settings import get_setting
LOGGER = logging.getLogger(__name__)
APP_NAME = 'tfs'
ECHO = False # False: No dump SQL commands and transactions executed
CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@127.0.0.1:{:s}/{:s}?sslmode={:s}'
# CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}'
class TelemetryEngine:
# def __init__(self):
# self.engine = self.get_engine()
@staticmethod
def get_engine() -> sqlalchemy.engine.Engine:
CRDB_NAMESPACE = "crdb"
CRDB_SQL_PORT = "26257"
CRDB_DATABASE = "telemetryfrontend"
CRDB_USERNAME = "tfs"
CRDB_PASSWORD = "tfs123"
CRDB_SSLMODE = "require"
crdb_uri = CRDB_URI_TEMPLATE.format(
CRDB_USERNAME, CRDB_PASSWORD, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
# crdb_uri = CRDB_URI_TEMPLATE.format(
# CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
crdb_uri = get_setting('CRDB_URI', default=None)
if crdb_uri is None:
CRDB_NAMESPACE = "crdb"
CRDB_SQL_PORT = "26257"
CRDB_DATABASE = "telemetryfrontend"
CRDB_USERNAME = "tfs"
CRDB_PASSWORD = "tfs123"
CRDB_SSLMODE = "require"
crdb_uri = CRDB_URI_TEMPLATE.format(
CRDB_USERNAME, CRDB_PASSWORD, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
try:
# engine = sqlalchemy.create_engine(
# crdb_uri, connect_args={'application_name': APP_NAME}, echo=ECHO, future=True)
engine = sqlalchemy.create_engine(crdb_uri, echo=False)
LOGGER.info(' TelemetryDBmanager initalized with DB URL: {:}'.format(crdb_uri))
except: # pylint: disable=bare-except # pragma: no cover
......@@ -47,13 +41,4 @@ class TelemetryEngine:
return None # type: ignore
return engine # type: ignore
@staticmethod
def create_database(engine : sqlalchemy.engine.Engine) -> None:
if not sqlalchemy_utils.database_exists(engine.url):
LOGGER.info("Database created. {:}".format(engine.url))
sqlalchemy_utils.create_database(engine.url)
@staticmethod
def drop_database(engine : sqlalchemy.engine.Engine) -> None:
if sqlalchemy_utils.database_exists(engine.url):
sqlalchemy_utils.drop_database(engine.url)
......@@ -14,9 +14,7 @@
import logging
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy import Column, Integer, String, Float, Text, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import Column, String, Float
from sqlalchemy.orm import registry
logging.basicConfig(level=logging.INFO)
......@@ -24,22 +22,23 @@ LOGGER = logging.getLogger(__name__)
# Create a base class for declarative models
Base = registry().generate_base()
# Base = declarative_base()
class Collector(Base):
__tablename__ = 'collector'
collector_id = Column(UUID(as_uuid=False), primary_key=True)
kpi_id = Column(UUID(as_uuid=False))
collector_decription = Column(String)
sampling_duration_s = Column(Float)
sampling_interval_s = Column(Float)
start_timestamp = Column(Float)
end_timestamp = Column(Float)
kpi_id = Column(UUID(as_uuid=False), nullable=False)
collector_decription = Column(String , nullable=False)
sampling_duration_s = Column(Float , nullable=False)
sampling_interval_s = Column(Float , nullable=False)
start_timestamp = Column(Float , nullable=False)
end_timestamp = Column(Float , nullable=False)
# helps in logging the information
def __repr__(self):
return (f"<Collector(collector_id='{self.collector_id}', kpi_id='{self.kpi_id}', "
f"collector='{self.collector_decription}', sampling_duration_s='{self.sampling_duration_s}', "
f"sampling_interval_s='{self.sampling_interval_s}', start_timestamp='{self.start_timestamp}', "
f"end_timestamp='{self.end_timestamp}')>")
\ No newline at end of file
f"end_timestamp='{self.end_timestamp}')>")
# add method to convert gRPC requests to rows if necessary...
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment