Newer
Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, time

Waleed Akbar
committed
import sqlalchemy
from sqlalchemy import inspect, MetaData, Table
from sqlalchemy.orm import sessionmaker
from telemetry.database.TelemetryModel import Collector as CollectorModel
from telemetry.database.TelemetryModel import Kpi as KpiModel
from sqlalchemy.ext.declarative import declarative_base
from telemetry.database.TelemetryEngine import TelemetryEngine
from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId

Waleed Akbar
committed
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
from sqlalchemy.exc import SQLAlchemyError

Waleed Akbar
committed
from telemetry.database.TelemetryModel import Base
LOGGER = logging.getLogger(__name__)

Waleed Akbar
committed
DB_NAME = "telemetryfrontend"
class TelemetryDBmanager:
def __init__(self):
self.db_engine = TelemetryEngine.get_engine()
if self.db_engine is None:
LOGGER.error('Unable to get SQLAlchemy DB Engine...')
return False
self.db_name = DB_NAME
self.Session = sessionmaker(bind=self.db_engine)
def create_database(self):
try:

Waleed Akbar
committed
# with self.db_engine.connect() as connection:
# connection.execute(f"CREATE DATABASE {self.db_name};")
TelemetryEngine.create_database(self.db_engine)
LOGGER.info('TelemetryDBmanager initalized DB Name: {:}'.format(self.db_name))

Waleed Akbar
committed
except Exception as e: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to check/create the database: {:s}'.format(str(e)))
return False
def create_tables(self):
try:
Base.metadata.create_all(self.db_engine) # type: ignore

Waleed Akbar
committed
LOGGER.info("Tables created in database ({:}) the as per Models".format(self.db_name))
except Exception as e:
LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e)))
def verify_tables(self):
try:
with self.db_engine.connect() as connection:
result = connection.execute("SHOW TABLES;")
tables = result.fetchall()

Waleed Akbar
committed
LOGGER.info("Tables in DB: {:}".format(tables))
except Exception as e:
LOGGER.info("Unable to fetch Table names. {:s}".format(str(e)))

Waleed Akbar
committed
def drop_table(self, table_to_drop: str):
try:
inspector = inspect(self.db_engine)
existing_tables = inspector.get_table_names()
if table_to_drop in existing_tables:
table = Table(table_to_drop, MetaData(), autoload_with=self.db_engine)
table.drop(self.db_engine)
LOGGER.info("Tables delete in the DB Name: {:}".format(self.db_name))
else:
LOGGER.warning("No table {:} in database {:} ".format(table_to_drop, DB_NAME))
except Exception as e:
LOGGER.info("Tables cannot be deleted in the {:} database. {:s}".format(DB_NAME, str(e)))
def list_databases(self):
query = "SHOW DATABASES"
with self.db_engine.connect() as connection:
result = connection.execute(query)
databases = [row[0] for row in result]
LOGGER.info("List of available DBs: {:}".format(databases))

Waleed Akbar
committed
# ------------------ INSERT METHODs --------------------------------------
def inser_kpi(self, request: KpiDescriptor):
session = self.Session()
try:
kpi_to_insert = KpiModel()
kpi_to_insert.kpi_id = request.kpi_id.kpi_id.uuid
kpi_to_insert.kpi_description = request.kpi_description
kpi_to_insert.kpi_sample_type = request.kpi_sample_type
kpi_to_insert.device_id = request.service_id.service_uuid.uuid
kpi_to_insert.endpoint_id = request.device_id.device_uuid.uuid
kpi_to_insert.service_id = request.slice_id.slice_uuid.uuid
kpi_to_insert.slice_id = request.endpoint_id.endpoint_uuid.uuid
kpi_to_insert.connection_id = request.connection_id.connection_uuid.uuid
# kpi_to_insert.link_id = request.link_id.link_id.uuid
# Add the instance to the session
session.add(kpi_to_insert)
session.commit()

Waleed Akbar
committed
LOGGER.info("Row inserted into kpi table: {:}".format(kpi_to_insert.kpi_id))
except Exception as e:
session.rollback()
LOGGER.info("Failed to insert new kpi. {:s}".format(str(e)))
finally:
# Close the session
session.close()
# Function to insert a row into the Collector model
def insert_collector(self, request: Collector):
session = self.Session()
try:
# Create a new Collector instance
collector_to_insert = CollectorModel()
collector_to_insert.collector_id = request.collector_id.collector_id.uuid
collector_to_insert.kpi_id = request.kpi_id.kpi_id.uuid
collector_to_insert.collector = "Test collector description"
collector_to_insert.sampling_duration_s = request.duration_s
collector_to_insert.sampling_interval_s = request.interval_s
collector_to_insert.start_timestamp = time.time()
collector_to_insert.end_timestamp = time.time()
session.add(collector_to_insert)
session.commit()

Waleed Akbar
committed
LOGGER.info("Row inserted into collector table: {:}".format(collector_to_insert.collector_id))
except Exception as e:
session.rollback()
LOGGER.info("Failed to insert new collector. {:s}".format(str(e)))
finally:
# Close the session
session.close()

Waleed Akbar
committed
# ------------------ GET METHODs --------------------------------------
def get_kpi_descriptor(self, request: KpiId):
session = self.Session()
try:

Waleed Akbar
committed
kpi_id_to_search = request.kpi_id.uuid
kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id_to_search).first()
if kpi:
LOGGER.info("kpi ID found: {:s}".format(str(kpi)))
return kpi
else:

Waleed Akbar
committed
LOGGER.warning("Kpi ID not found {:s}".format(str(kpi_id_to_search)))
return None
except Exception as e:
session.rollback()
LOGGER.info("Failed to retrieve KPI ID. {:s}".format(str(e)))
raise
finally:
session.close()

Waleed Akbar
committed
def get_collector(self, request: CollectorId):
session = self.Session()
try:
collector_id_to_search = request.collector_id.uuid
collector = session.query(CollectorModel).filter_by(collector_id=collector_id_to_search).first()
if collector:
LOGGER.info("collector ID found: {:s}".format(str(collector)))
return collector
else:
LOGGER.warning("collector ID not found{:s}".format(str(collector_id_to_search)))
return None
except Exception as e:
session.rollback()
LOGGER.info("Failed to retrieve collector ID. {:s}".format(str(e)))
raise
finally:
session.close()

Waleed Akbar
committed
# ------------------ SELECT METHODs --------------------------------------
def select_kpi_descriptor(self, **filters):
session = self.Session()
try:
query = session.query(KpiModel)
for column, value in filters.items():
query = query.filter(getattr(KpiModel, column) == value)
result = query.all()

Waleed Akbar
committed
if len(result) != 0:
LOGGER.info("Fetched filtered rows from KPI table with filters : {:s}".format(str(result)))
else:
LOGGER.warning("No matching row found : {:s}".format(str(result)))
return result
except SQLAlchemyError as e:
LOGGER.error("Error fetching filtered rows from KPI table with filters {:}: {:}".format(filters, e))
return []

Waleed Akbar
committed
finally:
session.close()
def select_collector(self, **filters):
session = self.Session()
try:
query = session.query(CollectorModel)
for column, value in filters.items():
query = query.filter(getattr(CollectorModel, column) == value)
result = query.all()

Waleed Akbar
committed
if len(result) != 0:
LOGGER.info("Fetched filtered rows from KPI table with filters : {:s}".format(str(result)))
else:
LOGGER.warning("No matching row found : {:s}".format(str(result)))

Waleed Akbar
committed
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
return result
except SQLAlchemyError as e:
LOGGER.error("Error fetching filtered rows from KPI table with filters {:}: {:}".format(filters, e))
return []
finally:
session.close()
# ------------------ DELETE METHODs --------------------------------------
def delete_kpi_descriptor(self, request: KpiId):
session = self.Session()
try:
kpi_id_to_delete = request.kpi_id.uuid
kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id_to_delete).first()
if kpi:
session.delete(kpi)
session.commit()
LOGGER.info("Deleted KPI with kpi_id: %s", kpi_id_to_delete)
else:
LOGGER.warning("KPI with kpi_id %s not found", kpi_id_to_delete)
except SQLAlchemyError as e:
session.rollback()
LOGGER.error("Error deleting KPI with kpi_id %s: %s", kpi_id_to_delete, e)
finally:
session.close()
def delete_collector(self, request: CollectorId):
session = self.Session()
try:
collector_id_to_delete = request.collector_id.uuid
collector = session.query(CollectorModel).filter_by(collector_id=collector_id_to_delete).first()
if collector:
session.delete(collector)
session.commit()

Waleed Akbar
committed
LOGGER.info("Deleted collector with collector_id: %s", collector_id_to_delete)

Waleed Akbar
committed
else:

Waleed Akbar
committed
LOGGER.warning("collector with collector_id %s not found", collector_id_to_delete)

Waleed Akbar
committed
except SQLAlchemyError as e:
session.rollback()

Waleed Akbar
committed
LOGGER.error("Error deleting collector with collector_id %s: %s", collector_id_to_delete, e)
finally:
session.close()