# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging, time from sqlalchemy import engine from sqlalchemy.orm import sessionmaker from telemetry.database.TelemetryModel import Collector as CollectorModel from telemetry.database.TelemetryModel import Kpi as KpiModel from sqlalchemy.ext.declarative import declarative_base from telemetry.database.TelemetryEngine import TelemetryEngine LOGGER = logging.getLogger(__name__) # Create a base class for declarative models Base = declarative_base() class TelemetryDB: def __init__(self): self.db_engine = TelemetryEngine.get_engine() if self.db_engine is None: LOGGER.error('Unable to get SQLAlchemy DB Engine...') return False LOGGER.info('test_telemetry_DB_connection -- Engine created sucessfully') def create_database(self): try: TelemetryEngine.create_database(self.db_engine) LOGGER.info('test_telemetry_DB_connection -- DB created sucessfully') return True except: # pylint: disable=bare-except # pragma: no cover LOGGER.exception('Failed to check/create the database: {:s}'.format(str(self.db_engine.url))) return False # Function to create the collector and KPI tables in the database def create_tables(self): try: Base.metadata.create_all(self.db_engine) # type: ignore LOGGER.info("Collector and KPI tables created in the TelemetryFrontend database") except Exception as e: LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e))) # Function to insert a row into the Collector model def insert_collector(self, kpi_id: int, collector: str, duration_s: float, interval_s: float): # Create a session Session = sessionmaker(bind=self.db_engine) session = Session() try: # Create a new Collector instance collectorObj = CollectorModel() collectorObj.kpi_id = kpi_id collectorObj.collector = collector collectorObj.sampling_duration_s = duration_s collectorObj.sampling_interval_s = interval_s collectorObj.start_timestamp = time.time() collectorObj.end_timestamp = time.time() # Add the instance to the session session.add(collectorObj) # Commit the session session.commit() LOGGER.info("New collector inserted successfully") except Exception as e: session.rollback() LOGGER.info("Failed to insert new collector. {:s}".format(str(e))) finally: # Close the session session.close() def inser_kpi(self, kpi_id, kpi_descriptor): # Create a session Session = sessionmaker(bind=self.db_engine) session = Session() try: # Create a new Collector instance KpiObj = KpiModel() KpiObj.kpi_id = kpi_id KpiObj.kpi_description = kpi_descriptor # Add the instance to the session session.add(KpiObj) # Commit the session session.commit() LOGGER.info("New collector inserted successfully") except Exception as e: session.rollback() LOGGER.info("Failed to insert new collector. {:s}".format(str(e))) finally: # Close the session session.close() def get_kpi(self, kpi_id): # Create a session Session = sessionmaker(bind=self.db_engine) session = Session() try: kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id).first() if kpi: LOGGER.info("kpi ID found: {:s}".format(str(kpi))) return kpi else: LOGGER.info("Kpi ID not found") return None except Exception as e: LOGGER.info("Failed to retrieve KPI ID. {:s}".format(str(e))) raise finally: # Close the session session.close()