# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging, time import sqlalchemy from sqlalchemy import inspect, MetaData, Table from sqlalchemy.orm import sessionmaker from telemetry.database.TelemetryModel import Collector as CollectorModel from telemetry.database.TelemetryModel import Kpi as KpiModel from sqlalchemy.ext.declarative import declarative_base from telemetry.database.TelemetryEngine import TelemetryEngine from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId from common.proto.telemetry_frontend_pb2 import Collector, CollectorId from sqlalchemy.exc import SQLAlchemyError from telemetry.database.TelemetryModel import Base LOGGER = logging.getLogger(__name__) DB_NAME = "telemetryfrontend" class TelemetryDBmanager: def __init__(self): self.db_engine = TelemetryEngine.get_engine() if self.db_engine is None: LOGGER.error('Unable to get SQLAlchemy DB Engine...') return False self.db_name = DB_NAME self.Session = sessionmaker(bind=self.db_engine) def create_database(self): try: # with self.db_engine.connect() as connection: # connection.execute(f"CREATE DATABASE {self.db_name};") TelemetryEngine.create_database(self.db_engine) LOGGER.info('TelemetryDBmanager initalized DB Name: {:}'.format(self.db_name)) return True except Exception as e: # pylint: disable=bare-except # pragma: no cover LOGGER.exception('Failed to check/create the database: {:s}'.format(str(e))) return False def create_tables(self): try: Base.metadata.create_all(self.db_engine) # type: ignore LOGGER.info("Tables created in database ({:}) the as per Models".format(self.db_name)) except Exception as e: LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e))) def verify_tables(self): try: with self.db_engine.connect() as connection: result = connection.execute("SHOW TABLES;") tables = result.fetchall() LOGGER.info("Tables in DB: {:}".format(tables)) except Exception as e: LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) def drop_table(self, table_to_drop: str): try: inspector = inspect(self.db_engine) existing_tables = inspector.get_table_names() if table_to_drop in existing_tables: table = Table(table_to_drop, MetaData(), autoload_with=self.db_engine) table.drop(self.db_engine) LOGGER.info("Tables delete in the DB Name: {:}".format(self.db_name)) else: LOGGER.warning("No table {:} in database {:} ".format(table_to_drop, DB_NAME)) except Exception as e: LOGGER.info("Tables cannot be deleted in the {:} database. {:s}".format(DB_NAME, str(e))) def list_databases(self): query = "SHOW DATABASES" with self.db_engine.connect() as connection: result = connection.execute(query) databases = [row[0] for row in result] LOGGER.info("List of available DBs: {:}".format(databases)) # ------------------ INSERT METHODs -------------------------------------- def inser_kpi(self, request: KpiDescriptor): session = self.Session() try: # Create a new Kpi instance kpi_to_insert = KpiModel() kpi_to_insert.kpi_id = request.kpi_id.kpi_id.uuid kpi_to_insert.kpi_description = request.kpi_description kpi_to_insert.kpi_sample_type = request.kpi_sample_type kpi_to_insert.device_id = request.service_id.service_uuid.uuid kpi_to_insert.endpoint_id = request.device_id.device_uuid.uuid kpi_to_insert.service_id = request.slice_id.slice_uuid.uuid kpi_to_insert.slice_id = request.endpoint_id.endpoint_uuid.uuid kpi_to_insert.connection_id = request.connection_id.connection_uuid.uuid # kpi_to_insert.link_id = request.link_id.link_id.uuid # Add the instance to the session session.add(kpi_to_insert) session.commit() LOGGER.info("Row inserted into kpi table: {:}".format(kpi_to_insert.kpi_id)) except Exception as e: session.rollback() LOGGER.info("Failed to insert new kpi. {:s}".format(str(e))) finally: # Close the session session.close() # Function to insert a row into the Collector model def insert_collector(self, request: Collector): session = self.Session() try: # Create a new Collector instance collector_to_insert = CollectorModel() collector_to_insert.collector_id = request.collector_id.collector_id.uuid collector_to_insert.kpi_id = request.kpi_id.kpi_id.uuid collector_to_insert.collector = "Test collector description" collector_to_insert.sampling_duration_s = request.duration_s collector_to_insert.sampling_interval_s = request.interval_s collector_to_insert.start_timestamp = time.time() collector_to_insert.end_timestamp = time.time() session.add(collector_to_insert) session.commit() LOGGER.info("Row inserted into collector table: {:}".format(collector_to_insert.collector_id)) except Exception as e: session.rollback() LOGGER.info("Failed to insert new collector. {:s}".format(str(e))) finally: # Close the session session.close() # ------------------ GET METHODs -------------------------------------- def get_kpi_descriptor(self, request: KpiId): session = self.Session() try: kpi_id_to_search = request.kpi_id.uuid kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id_to_search).first() if kpi: LOGGER.info("kpi ID found: {:s}".format(str(kpi))) return kpi else: LOGGER.warning("Kpi ID not found {:s}".format(str(kpi_id_to_search))) return None except Exception as e: session.rollback() LOGGER.info("Failed to retrieve KPI ID. {:s}".format(str(e))) raise finally: session.close() def get_collector(self, request: CollectorId): session = self.Session() try: collector_id_to_search = request.collector_id.uuid collector = session.query(CollectorModel).filter_by(collector_id=collector_id_to_search).first() if collector: LOGGER.info("collector ID found: {:s}".format(str(collector))) return collector else: LOGGER.warning("collector ID not found{:s}".format(str(collector_id_to_search))) return None except Exception as e: session.rollback() LOGGER.info("Failed to retrieve collector ID. {:s}".format(str(e))) raise finally: session.close() # ------------------ SELECT METHODs -------------------------------------- def select_kpi_descriptor(self, **filters): session = self.Session() try: query = session.query(KpiModel) for column, value in filters.items(): query = query.filter(getattr(KpiModel, column) == value) result = query.all() if len(result) != 0: LOGGER.info("Fetched filtered rows from KPI table with filters : {:s}".format(str(result))) else: LOGGER.warning("No matching row found : {:s}".format(str(result))) return result except SQLAlchemyError as e: LOGGER.error("Error fetching filtered rows from KPI table with filters {:}: {:}".format(filters, e)) return [] finally: session.close() def select_collector(self, **filters): session = self.Session() try: query = session.query(CollectorModel) for column, value in filters.items(): query = query.filter(getattr(CollectorModel, column) == value) result = query.all() if len(result) != 0: LOGGER.info("Fetched filtered rows from KPI table with filters : {:s}".format(str(result))) else: LOGGER.warning("No matching row found : {:s}".format(str(result))) return result except SQLAlchemyError as e: LOGGER.error("Error fetching filtered rows from KPI table with filters {:}: {:}".format(filters, e)) return [] finally: session.close() # ------------------ DELETE METHODs -------------------------------------- def delete_kpi_descriptor(self, request: KpiId): session = self.Session() try: kpi_id_to_delete = request.kpi_id.uuid kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id_to_delete).first() if kpi: session.delete(kpi) session.commit() LOGGER.info("Deleted KPI with kpi_id: %s", kpi_id_to_delete) else: LOGGER.warning("KPI with kpi_id %s not found", kpi_id_to_delete) except SQLAlchemyError as e: session.rollback() LOGGER.error("Error deleting KPI with kpi_id %s: %s", kpi_id_to_delete, e) finally: session.close() def delete_collector(self, request: CollectorId): session = self.Session() try: collector_id_to_delete = request.collector_id.uuid collector = session.query(CollectorModel).filter_by(collector_id=collector_id_to_delete).first() if collector: session.delete(collector) session.commit() LOGGER.info("Deleted collector with collector_id: %s", collector_id_to_delete) else: LOGGER.warning("collector with collector_id %s not found", collector_id_to_delete) except SQLAlchemyError as e: session.rollback() LOGGER.error("Error deleting collector with collector_id %s: %s", collector_id_to_delete, e) finally: session.close()