diff --git a/src/telemetry/database/TelemetryDB.py b/src/telemetry/database/TelemetryDB.py deleted file mode 100644 index 5ce722af548d88b77cb8326b54cd69b8d7433e89..0000000000000000000000000000000000000000 --- a/src/telemetry/database/TelemetryDB.py +++ /dev/null @@ -1,122 +0,0 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging, time -from sqlalchemy import engine -from sqlalchemy.orm import sessionmaker -from telemetry.database.TelemetryModel import Collector as CollectorModel -from telemetry.database.TelemetryModel import Kpi as KpiModel -from sqlalchemy.ext.declarative import declarative_base -from telemetry.database.TelemetryEngine import TelemetryEngine - -LOGGER = logging.getLogger(__name__) - -# Create a base class for declarative models -Base = declarative_base() - -class TelemetryDB: - def __init__(self): - self.db_engine = TelemetryEngine.get_engine() - if self.db_engine is None: - LOGGER.error('Unable to get SQLAlchemy DB Engine...') - return False - LOGGER.info('test_telemetry_DB_connection -- Engine created sucessfully') - - def create_database(self): - try: - TelemetryEngine.create_database(self.db_engine) - LOGGER.info('test_telemetry_DB_connection -- DB created sucessfully') - return True - except: # pylint: disable=bare-except # pragma: no cover - LOGGER.exception('Failed to check/create the database: {:s}'.format(str(self.db_engine.url))) - return False - - # Function to create the collector and KPI tables in the database - def create_tables(self): - try: - Base.metadata.create_all(self.db_engine) # type: ignore - LOGGER.info("Collector and KPI tables created in the TelemetryFrontend database") - except Exception as e: - LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e))) - - # Function to insert a row into the Collector model - def insert_collector(self, kpi_id: int, collector: str, duration_s: float, interval_s: float): - # Create a session - Session = sessionmaker(bind=self.db_engine) - session = Session() - try: - # Create a new Collector instance - collectorObj = CollectorModel() - collectorObj.kpi_id = kpi_id - collectorObj.collector = collector - collectorObj.sampling_duration_s = duration_s - collectorObj.sampling_interval_s = interval_s - collectorObj.start_timestamp = time.time() - collectorObj.end_timestamp = time.time() - - # Add the instance to the session - session.add(collectorObj) - - # Commit the session - session.commit() - LOGGER.info("New collector inserted successfully") - except Exception as e: - session.rollback() - LOGGER.info("Failed to insert new collector. {:s}".format(str(e))) - finally: - # Close the session - session.close() - - def inser_kpi(self, kpi_id, kpi_descriptor): - # Create a session - Session = sessionmaker(bind=self.db_engine) - session = Session() - try: - # Create a new Collector instance - KpiObj = KpiModel() - KpiObj.kpi_id = kpi_id - KpiObj.kpi_description = kpi_descriptor - - # Add the instance to the session - session.add(KpiObj) - - # Commit the session - session.commit() - LOGGER.info("New collector inserted successfully") - except Exception as e: - session.rollback() - LOGGER.info("Failed to insert new collector. {:s}".format(str(e))) - finally: - # Close the session - session.close() - - def get_kpi(self, kpi_id): - # Create a session - Session = sessionmaker(bind=self.db_engine) - session = Session() - try: - kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id).first() - - if kpi: - LOGGER.info("kpi ID found: {:s}".format(str(kpi))) - return kpi - else: - LOGGER.info("Kpi ID not found") - return None - except Exception as e: - LOGGER.info("Failed to retrieve KPI ID. {:s}".format(str(e))) - raise - finally: - # Close the session - session.close() \ No newline at end of file diff --git a/src/telemetry/database/TelemetryDBmanager.py b/src/telemetry/database/TelemetryDBmanager.py new file mode 100644 index 0000000000000000000000000000000000000000..42d647e0d660a2040b4d025f5316c6e073c70e1e --- /dev/null +++ b/src/telemetry/database/TelemetryDBmanager.py @@ -0,0 +1,148 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, time +from sqlalchemy import inspect +from sqlalchemy.orm import sessionmaker +from telemetry.database.TelemetryModel import Collector as CollectorModel +from telemetry.database.TelemetryModel import Kpi as KpiModel +from sqlalchemy.ext.declarative import declarative_base +from telemetry.database.TelemetryEngine import TelemetryEngine +from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId +from common.proto.telemetry_frontend_pb2 import Collector +from sqlalchemy.exc import SQLAlchemyError + + +LOGGER = logging.getLogger(__name__) +DB_NAME = "TelemetryFrontend" + +# Create a base class for declarative models +Base = declarative_base() + +class TelemetryDBmanager: + def __init__(self): + self.db_engine = TelemetryEngine.get_engine() + if self.db_engine is None: + LOGGER.error('Unable to get SQLAlchemy DB Engine...') + return False + self.db_name = DB_NAME + self.Session = sessionmaker(bind=self.db_engine) + + def create_database(self): + try: + with self.db_engine.connect() as connection: + connection.execute(f"CREATE DATABASE {self.db_name};") + LOGGER.info('TelemetryDBmanager initalized DB Name: {self.db_name}') + return True + except: # pylint: disable=bare-except # pragma: no cover + LOGGER.exception('Failed to check/create the database: {:s}'.format(str(self.db_engine.url))) + return False + + def create_tables(self): + try: + Base.metadata.create_all(self.db_engine) # type: ignore + LOGGER.info("Tables created in the DB Name: {:}".format(self.db_name)) + except Exception as e: + LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e))) + + def verify_tables(self): + try: + with self.db_engine.connect() as connection: + result = connection.execute("SHOW TABLES;") + tables = result.fetchall() + LOGGER.info("Tables verified: {:}".format(tables)) + except Exception as e: + LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) + + def inser_kpi(self, request: KpiDescriptor): + session = self.Session() + try: + # Create a new Collector instance + kpi_to_insert = KpiModel() + kpi_to_insert.kpi_id = request.kpi_id.kpi_id.uuid + kpi_to_insert.kpi_description = request.kpi_description + kpi_to_insert.kpi_sample_type = request.kpi_sample_type + kpi_to_insert.device_id = request.service_id.service_uuid.uuid + kpi_to_insert.endpoint_id = request.device_id.device_uuid.uuid + kpi_to_insert.service_id = request.slice_id.slice_uuid.uuid + kpi_to_insert.slice_id = request.endpoint_id.endpoint_uuid.uuid + kpi_to_insert.connection_id = request.connection_id.connection_uuid.uuid + # kpi_to_insert.link_id = request.link_id.link_id.uuid + # Add the instance to the session + session.add(kpi_to_insert) + session.commit() + LOGGER.info("Row inserted into kpi table: {:}".format(kpi_to_insert)) + except Exception as e: + session.rollback() + LOGGER.info("Failed to insert new kpi. {:s}".format(str(e))) + finally: + # Close the session + session.close() + + # Function to insert a row into the Collector model + def insert_collector(self, request: Collector): + session = self.Session() + try: + # Create a new Collector instance + collector_to_insert = CollectorModel() + collector_to_insert.collector_id = request.collector_id.collector_id.uuid + collector_to_insert.kpi_id = request.kpi_id.kpi_id.uuid + collector_to_insert.collector = "Test collector description" + collector_to_insert.sampling_duration_s = request.duration_s + collector_to_insert.sampling_interval_s = request.interval_s + collector_to_insert.start_timestamp = time.time() + collector_to_insert.end_timestamp = time.time() + + session.add(collector_to_insert) + session.commit() + LOGGER.info("Row inserted into collector table: {:}".format(collector_to_insert)) + except Exception as e: + session.rollback() + LOGGER.info("Failed to insert new collector. {:s}".format(str(e))) + finally: + # Close the session + session.close() + + def get_kpi_descriptor(self, kpi_id: KpiId): + session = self.Session() + try: + kpi_id_to_get = kpi_id.kpi_id.uuid + kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id_to_get).first() + if kpi: + LOGGER.info("kpi ID found: {:s}".format(str(kpi))) + return kpi + else: + LOGGER.info("Kpi ID not found{:s}".format(str(kpi_id_to_get))) + return None + except Exception as e: + session.rollback() + LOGGER.info("Failed to retrieve KPI ID. {:s}".format(str(e))) + raise + finally: + session.close() + + def select_kpi_descriptor(self, **filters): + session = self.Session() + try: + query = session.query(KpiModel) + for column, value in filters.items(): + query = query.filter(getattr(KpiModel, column) == value) + result = query.all() + LOGGER.info("Fetched filtered rows from KPI table with filters ---------- : {:s}".format(str(result))) + return result + except SQLAlchemyError as e: + LOGGER.error("Error fetching filtered rows from KPI table with filters {:}: {:}".format(filters, e)) + return [] + finally: + session.close() \ No newline at end of file diff --git a/src/telemetry/database/TelemetryEngine.py b/src/telemetry/database/TelemetryEngine.py index 1884368bd51658738eb5a65d65b8bdda177229b5..d6e54cc2f9a3a07a339196deb87f566fef0d5775 100644 --- a/src/telemetry/database/TelemetryEngine.py +++ b/src/telemetry/database/TelemetryEngine.py @@ -39,8 +39,10 @@ class TelemetryEngine: # crdb_uri = CRDB_URI_TEMPLATE.format( # CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) try: - engine = sqlalchemy.create_engine( - crdb_uri, connect_args={'application_name': APP_NAME}, echo=ECHO, future=True) + # engine = sqlalchemy.create_engine( + # crdb_uri, connect_args={'application_name': APP_NAME}, echo=ECHO, future=True) + engine = sqlalchemy.create_engine(crdb_uri) + LOGGER.info(' --- TelemetryDBmanager initalized with DB URL: {:}'.format(crdb_uri)) except: # pylint: disable=bare-except # pragma: no cover LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) return None # type: ignore diff --git a/src/telemetry/database/TelemetryModel.py b/src/telemetry/database/TelemetryModel.py index 1f40bad56917036c326751f0b6873551d4133ce2..8defdd2e8005288860c7a78e8f2306b95f9d5e26 100644 --- a/src/telemetry/database/TelemetryModel.py +++ b/src/telemetry/database/TelemetryModel.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +from sqlalchemy.dialects.postgresql import UUID from sqlalchemy import Column, Integer, String, Float, Text, ForeignKey from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship @@ -25,35 +26,46 @@ LOGGER = logging.getLogger(__name__) Base = declarative_base() class Kpi(Base): - __tablename__ = 'KPI' + __tablename__ = 'kpi' - kpi_id = Column(Integer, primary_key=True, autoincrement=True) + kpi_id = Column(UUID(as_uuid=False), primary_key=True) kpi_description = Column(Text) kpi_sample_type = Column(Integer) - device_id = Column(String) - endpoint_id = Column(String) - service_id = Column(String) - slice_id = Column(String) - connection_id = Column(String) - link_id = Column(String) - monitor_flag = Column(String) + device_id = Column(String) + endpoint_id = Column(String) + service_id = Column(String) + slice_id = Column(String) + connection_id = Column(String) + link_id = Column(String) + # monitor_flag = Column(String) # Relationship to Collector model: allows access to related Collector objects from a Kpi object collectors = relationship('Collector', back_populates='kpi') + # helps in logging the information + def __repr__(self): + return (f"<Kpi(kpi_id='{self.kpi_id}', kpi_description='{self.kpi_description}', " + f"kpi_sample_type='{self.kpi_sample_type}', device_id='{self.device_id}', " + f"endpoint_id='{self.endpoint_id}', service_id='{self.service_id}', " + f"slice_id='{self.slice_id}', connection_id='{self.connection_id}', " + f"link_id='{self.link_id}')>") + class Collector(Base): __tablename__ = 'collector' - collector_id = Column(Integer, primary_key=True, autoincrement=True) - kpi_id = Column(Integer, ForeignKey('KPI.kpi_id')) - collector = Column(String) + collector_id = Column(UUID(as_uuid=False), primary_key=True) + kpi_id = Column(UUID(as_uuid=False), ForeignKey('kpi.kpi_id')) + collector = Column(String) sampling_duration_s = Column(Float) sampling_interval_s = Column(Float) - start_timestamp = Column(Float) - end_timestamp = Column(Float) + start_timestamp = Column(Float) + end_timestamp = Column(Float) # Relationship to Kpi model: allows access to the related Kpi object from a Collector object kpi = relationship('Kpi', back_populates='collectors') - - + def __repr__(self): + return (f"<Collector(collector_id='{self.collector_id}', kpi_id='{self.kpi_id}', " + f"collector='{self.collector}', sampling_duration_s='{self.sampling_duration_s}', " + f"sampling_interval_s='{self.sampling_interval_s}', start_timestamp='{self.start_timestamp}', " + f"end_timestamp='{self.end_timestamp}')>") \ No newline at end of file diff --git a/src/telemetry/database/tests/messages.py b/src/telemetry/database/tests/messages.py index 911abcdc976f4dd78ce31fb10ecc6558c62f593b..ea59d09251934ea0785d5367afea4919c8d05979 100644 --- a/src/telemetry/database/tests/messages.py +++ b/src/telemetry/database/tests/messages.py @@ -15,11 +15,40 @@ import uuid import random from common.proto import telemetry_frontend_pb2 +from common.proto import kpi_manager_pb2 +from common.proto.kpi_sample_types_pb2 import KpiSampleType + def create_collector_request(): _create_collector_request = telemetry_frontend_pb2.Collector() _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) - _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_collector_request.kpi_id.kpi_id.uuid = '2a779f04-77a6-4b32-b020-893e0e1e656f' # must be primary key in kpi table + # _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_collector_request.duration_s = float(random.randint(8, 16)) _create_collector_request.interval_s = float(random.randint(2, 4)) - return _create_collector_request \ No newline at end of file + return _create_collector_request + +def create_kpi_request(): + _create_kpi_request = kpi_manager_pb2.KpiDescriptor() + _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_kpi_request.kpi_description = 'KPI Description Test' + _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED + _create_kpi_request.service_id.service_uuid.uuid = 'SERV' + _create_kpi_request.device_id.device_uuid.uuid = 'DEV' + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC' + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END' + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON' + # _create_kpi_request.link_id.link_id.uuid = 'LNK' + return _create_kpi_request + +def create_kpi_id_request(): + _create_kpi_id_request = kpi_manager_pb2.KpiId() + _create_kpi_id_request.kpi_id.uuid = '11e2c6c6-b507-40aa-ab3a-ffd41e7125f0' + return _create_kpi_id_request + +def create_kpi_filter_request(): + # create a dict as follows: 'Key' = 'KpiModel' column name and 'Value' = filter to apply. + _create_kpi_filter_request = dict() + _create_kpi_filter_request['kpi_sample_type'] = 102 + _create_kpi_filter_request['kpi_id'] = '11e2c6c6-b507-40aa-ab3a-ffd41e7125f0' + return _create_kpi_filter_request \ No newline at end of file diff --git a/src/telemetry/database/tests/telemetryDBtests.py b/src/telemetry/database/tests/telemetryDBtests.py index 0d0977bced8db73f9e1286a08a62eba186efcebe..217bdcfd484fcea2ae80ad45997799e44fac4fdc 100644 --- a/src/telemetry/database/tests/telemetryDBtests.py +++ b/src/telemetry/database/tests/telemetryDBtests.py @@ -14,27 +14,37 @@ # limitations under the License. import logging -from telemetry.database.TelemetryDB import TelemetryDB -from .messages import create_collector_request +from typing import Any +from telemetry.database.TelemetryDBmanager import TelemetryDBmanager +from telemetry.database.TelemetryEngine import TelemetryEngine +from telemetry.database.tests import temp_DB +from .messages import create_kpi_request, create_collector_request, \ + create_kpi_id_request, create_kpi_filter_request logging.basicConfig(level=logging.INFO) LOGGER = logging.getLogger(__name__) -def test_telemetry_DB_connection(): - LOGGER.info('test_telemetry_DB_connection begin') - TelemetryDBobj = TelemetryDB() - if(TelemetryDBobj.create_database()): - LOGGER.info('test_telemetry_DB_connection -----DB----') - TelemetryDBobj.create_tables() # type: ignore - LOGGER.info('test_telemetry_DB_connection -----Table----') - TelemetryDBobj.inser_kpi(4, 'this is test kpi') - LOGGER.info('test_telemetry_DB_connection -----INSERT KPI----') - TelemetryDBobj.insert_collector(4, "this is test collector", 3.0, 12.0) - LOGGER.info('test_telemetry_DB_connection -----INSERT COL----') - TelemetryDBobj.get_kpi(1) - LOGGER.info('test_telemetry_DB_connection -----GET KPI----') - - - +# def test_temp_DB(): +# temp_DB.main() + +def test_telemetry_object_creation(): + LOGGER.info('--- test_telemetry_object_creation: START') + LOGGER.info('>>> Creating TelemetryDBmanager Object: ') + TelemetryDBmanagerObj = TelemetryDBmanager() + # LOGGER.info('>>> Creating Tables: ') + # TelemetryDBmanagerObj.create_tables() + # LOGGER.info('>>> Verifing Table creation: ') + # TelemetryDBmanagerObj.verify_tables() + LOGGER.info('>>> Row Insertion Operation: kpi Table') + kpi_obj = create_kpi_request() + TelemetryDBmanagerObj.inser_kpi(kpi_obj) + LOGGER.info('>>> Row Insertion Operation: collector Table') + collector_obj = create_collector_request() + TelemetryDBmanagerObj.insert_collector(collector_obj) + LOGGER.info('>>> Get KpiDescriptor ') + kpi_id_obj = create_kpi_id_request() + TelemetryDBmanagerObj.get_kpi_descriptor(kpi_id_obj) + kpi_filter : dict[str, Any] = create_kpi_filter_request() + TelemetryDBmanagerObj.select_kpi_descriptor(**kpi_filter) \ No newline at end of file