Commit fdeccb2b authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Kpi and Telemetry Basic operation: (Add, filter and select)

parent d0ee5bea
Loading
Loading
Loading
Loading
+148 −0
Original line number Diff line number Diff line
@@ -13,87 +13,100 @@
# limitations under the License.

import logging, time
from sqlalchemy import engine
from sqlalchemy import inspect
from sqlalchemy.orm import sessionmaker
from telemetry.database.TelemetryModel import Collector as CollectorModel
from telemetry.database.TelemetryModel import Kpi as KpiModel
from sqlalchemy.ext.declarative import declarative_base
from telemetry.database.TelemetryEngine import TelemetryEngine
from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId
from common.proto.telemetry_frontend_pb2 import Collector
from sqlalchemy.exc import SQLAlchemyError


LOGGER = logging.getLogger(__name__)
DB_NAME = "TelemetryFrontend"

# Create a base class for declarative models
Base = declarative_base()

class TelemetryDB:
class TelemetryDBmanager:
    def __init__(self):
        self.db_engine = TelemetryEngine.get_engine()
        if self.db_engine is None:
            LOGGER.error('Unable to get SQLAlchemy DB Engine...')
            return False
        LOGGER.info('test_telemetry_DB_connection -- Engine created sucessfully')
        self.db_name = DB_NAME
        self.Session = sessionmaker(bind=self.db_engine)

    def create_database(self):
        try:
            TelemetryEngine.create_database(self.db_engine)
            LOGGER.info('test_telemetry_DB_connection -- DB created sucessfully')
            with self.db_engine.connect() as connection:
                connection.execute(f"CREATE DATABASE {self.db_name};")
            LOGGER.info('TelemetryDBmanager initalized DB Name: {self.db_name}')
            return True
        except: # pylint: disable=bare-except # pragma: no cover
            LOGGER.exception('Failed to check/create the database: {:s}'.format(str(self.db_engine.url)))
            return False

    # Function to create the collector and KPI tables in the database
    def create_tables(self):
        try:
            Base.metadata.create_all(self.db_engine)     # type: ignore
            LOGGER.info("Collector and KPI tables created in the TelemetryFrontend database")
            LOGGER.info("Tables created in the DB Name: {:}".format(self.db_name))
        except Exception as e:
            LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e)))

    # Function to insert a row into the Collector model
    def insert_collector(self, kpi_id: int, collector: str, duration_s: float, interval_s: float):
        # Create a session
        Session = sessionmaker(bind=self.db_engine)
        session = Session()
    def verify_tables(self):
        try:
            # Create a new Collector instance
            collectorObj                     = CollectorModel()
            collectorObj.kpi_id              = kpi_id
            collectorObj.collector           = collector
            collectorObj.sampling_duration_s = duration_s
            collectorObj.sampling_interval_s = interval_s
            collectorObj.start_timestamp     = time.time()
            collectorObj.end_timestamp       = time.time()
            with self.db_engine.connect() as connection:
                result = connection.execute("SHOW TABLES;")
                tables = result.fetchall()
                LOGGER.info("Tables verified: {:}".format(tables))
        except Exception as e:
            LOGGER.info("Unable to fetch Table names. {:s}".format(str(e)))

    def inser_kpi(self, request: KpiDescriptor):
        session = self.Session()
        try:
            # Create a new Collector instance
            kpi_to_insert                 = KpiModel()
            kpi_to_insert.kpi_id          = request.kpi_id.kpi_id.uuid
            kpi_to_insert.kpi_description = request.kpi_description
            kpi_to_insert.kpi_sample_type = request.kpi_sample_type
            kpi_to_insert.device_id       = request.service_id.service_uuid.uuid 
            kpi_to_insert.endpoint_id     = request.device_id.device_uuid.uuid 
            kpi_to_insert.service_id      = request.slice_id.slice_uuid.uuid 
            kpi_to_insert.slice_id        = request.endpoint_id.endpoint_uuid.uuid
            kpi_to_insert.connection_id   = request.connection_id.connection_uuid.uuid
            # kpi_to_insert.link_id         = request.link_id.link_id.uuid
            # Add the instance to the session
            session.add(collectorObj)
            
            # Commit the session
            session.add(kpi_to_insert)
            session.commit()
            LOGGER.info("New collector inserted successfully")
            LOGGER.info("Row inserted into kpi table: {:}".format(kpi_to_insert))
        except Exception as e:
            session.rollback()
            LOGGER.info("Failed to insert new collector. {:s}".format(str(e)))
            LOGGER.info("Failed to insert new kpi. {:s}".format(str(e)))
        finally:
            # Close the session
            session.close()

    def inser_kpi(self, kpi_id, kpi_descriptor):
        # Create a session
        Session = sessionmaker(bind=self.db_engine)
        session = Session()
    # Function to insert a row into the Collector model
    def insert_collector(self, request: Collector):
        session = self.Session()
        try:
            # Create a new Collector instance
            KpiObj                 = KpiModel()
            KpiObj.kpi_id          = kpi_id
            KpiObj.kpi_description = kpi_descriptor
            collector_to_insert                     = CollectorModel()
            collector_to_insert.collector_id        = request.collector_id.collector_id.uuid
            collector_to_insert.kpi_id              = request.kpi_id.kpi_id.uuid  
            collector_to_insert.collector           = "Test collector description"
            collector_to_insert.sampling_duration_s = request.duration_s
            collector_to_insert.sampling_interval_s = request.interval_s
            collector_to_insert.start_timestamp     = time.time()
            collector_to_insert.end_timestamp       = time.time()
            
            # Add the instance to the session
            session.add(KpiObj)
            
            # Commit the session
            session.add(collector_to_insert)
            session.commit()
            LOGGER.info("New collector inserted successfully")
            LOGGER.info("Row inserted into collector table: {:}".format(collector_to_insert))
        except Exception as e:
            session.rollback()
            LOGGER.info("Failed to insert new collector. {:s}".format(str(e)))
@@ -101,22 +114,35 @@ class TelemetryDB:
            # Close the session
            session.close()

    def get_kpi(self, kpi_id):
        # Create a session
        Session = sessionmaker(bind=self.db_engine)
        session = Session()
    def get_kpi_descriptor(self, kpi_id: KpiId):
        session = self.Session()
        try:
            kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id).first()
            
            kpi_id_to_get = kpi_id.kpi_id.uuid
            kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id_to_get).first()
            if kpi:
                LOGGER.info("kpi ID found: {:s}".format(str(kpi)))
                return kpi
            else:
                LOGGER.info("Kpi ID not found")
                LOGGER.info("Kpi ID not found{:s}".format(str(kpi_id_to_get)))
                return None
        except Exception as e:
            session.rollback()
            LOGGER.info("Failed to retrieve KPI ID. {:s}".format(str(e)))
            raise
        finally:
            # Close the session
            session.close()
    
    def select_kpi_descriptor(self, **filters):
        session = self.Session()
        try:
            query = session.query(KpiModel)
            for column, value in filters.items():
                query = query.filter(getattr(KpiModel, column) == value)
            result = query.all()
            LOGGER.info("Fetched filtered rows from KPI table with filters ---------- : {:s}".format(str(result)))
            return result
        except SQLAlchemyError as e:
            LOGGER.error("Error fetching filtered rows from KPI table with filters {:}: {:}".format(filters, e))
            return []
        finally:
            session.close()
 No newline at end of file
+4 −2
Original line number Diff line number Diff line
@@ -39,8 +39,10 @@ class TelemetryEngine:
        # crdb_uri = CRDB_URI_TEMPLATE.format(
        #         CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
        try:
            engine = sqlalchemy.create_engine(
                crdb_uri, connect_args={'application_name': APP_NAME}, echo=ECHO, future=True)
            # engine = sqlalchemy.create_engine(
            #     crdb_uri, connect_args={'application_name': APP_NAME}, echo=ECHO, future=True)
            engine = sqlalchemy.create_engine(crdb_uri)
            LOGGER.info(' --- TelemetryDBmanager initalized with DB URL: {:}'.format(crdb_uri))
        except: # pylint: disable=bare-except # pragma: no cover
            LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri)))
            return None # type: ignore
+28 −16
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@
# limitations under the License.

import logging
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy import Column, Integer, String, Float, Text, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
@@ -25,9 +26,9 @@ LOGGER = logging.getLogger(__name__)
Base = declarative_base()

class Kpi(Base):
    __tablename__ = 'KPI'
    __tablename__ = 'kpi'

    kpi_id = Column(Integer, primary_key=True, autoincrement=True)
    kpi_id          = Column(UUID(as_uuid=False), primary_key=True)
    kpi_description = Column(Text)
    kpi_sample_type = Column(Integer)
    device_id       = Column(String)
@@ -36,16 +37,24 @@ class Kpi(Base):
    slice_id        = Column(String)
    connection_id   = Column(String)
    link_id         = Column(String)
    monitor_flag = Column(String)
    # monitor_flag    = Column(String)

    # Relationship to Collector model: allows access to related Collector objects from a Kpi object
    collectors = relationship('Collector', back_populates='kpi')

    # helps in logging the information
    def __repr__(self):
        return (f"<Kpi(kpi_id='{self.kpi_id}', kpi_description='{self.kpi_description}', "
                f"kpi_sample_type='{self.kpi_sample_type}', device_id='{self.device_id}', "
                f"endpoint_id='{self.endpoint_id}', service_id='{self.service_id}', "
                f"slice_id='{self.slice_id}', connection_id='{self.connection_id}', "
                f"link_id='{self.link_id}')>")
    
class Collector(Base):
    __tablename__ = 'collector'

    collector_id = Column(Integer, primary_key=True, autoincrement=True)
    kpi_id = Column(Integer, ForeignKey('KPI.kpi_id'))
    collector_id        = Column(UUID(as_uuid=False), primary_key=True)
    kpi_id              = Column(UUID(as_uuid=False), ForeignKey('kpi.kpi_id'))
    collector           = Column(String)
    sampling_duration_s = Column(Float)
    sampling_interval_s = Column(Float)
@@ -55,5 +64,8 @@ class Collector(Base):
    # Relationship to Kpi model: allows access to the related Kpi object from a Collector object
    kpi = relationship('Kpi', back_populates='collectors')


    def __repr__(self):
        return (f"<Collector(collector_id='{self.collector_id}', kpi_id='{self.kpi_id}', "
                f"collector='{self.collector}', sampling_duration_s='{self.sampling_duration_s}', "
                f"sampling_interval_s='{self.sampling_interval_s}', start_timestamp='{self.start_timestamp}', "
                f"end_timestamp='{self.end_timestamp}')>")
 No newline at end of file
+31 −2
Original line number Diff line number Diff line
@@ -15,11 +15,40 @@
import uuid
import random
from common.proto import telemetry_frontend_pb2
from common.proto import kpi_manager_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType


def create_collector_request():
    _create_collector_request                                = telemetry_frontend_pb2.Collector()
    _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4())
    _create_collector_request.kpi_id.kpi_id.uuid             = str(uuid.uuid4())
    _create_collector_request.kpi_id.kpi_id.uuid             = '2a779f04-77a6-4b32-b020-893e0e1e656f' # must be primary key in kpi table
    # _create_collector_request.kpi_id.kpi_id.uuid             = str(uuid.uuid4())
    _create_collector_request.duration_s                     = float(random.randint(8, 16))
    _create_collector_request.interval_s                     = float(random.randint(2, 4))
    return _create_collector_request

def create_kpi_request():
    _create_kpi_request                                     = kpi_manager_pb2.KpiDescriptor()
    _create_kpi_request.kpi_id.kpi_id.uuid                  = str(uuid.uuid4())
    _create_kpi_request.kpi_description                     = 'KPI Description Test'
    _create_kpi_request.kpi_sample_type                     = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
    _create_kpi_request.service_id.service_uuid.uuid        = 'SERV' 
    _create_kpi_request.device_id.device_uuid.uuid          = 'DEV'  
    _create_kpi_request.slice_id.slice_uuid.uuid            = 'SLC'  
    _create_kpi_request.endpoint_id.endpoint_uuid.uuid      = 'END'  
    _create_kpi_request.connection_id.connection_uuid.uuid  = 'CON'  
    # _create_kpi_request.link_id.link_id.uuid                = 'LNK'
    return _create_kpi_request

def create_kpi_id_request():
    _create_kpi_id_request             = kpi_manager_pb2.KpiId()
    _create_kpi_id_request.kpi_id.uuid = '11e2c6c6-b507-40aa-ab3a-ffd41e7125f0'
    return _create_kpi_id_request

def create_kpi_filter_request():
    # create a dict as follows: 'Key' = 'KpiModel' column name and 'Value' = filter to apply.
    _create_kpi_filter_request                    = dict()
    _create_kpi_filter_request['kpi_sample_type'] = 102
    _create_kpi_filter_request['kpi_id']          = '11e2c6c6-b507-40aa-ab3a-ffd41e7125f0'
    return _create_kpi_filter_request
 No newline at end of file
+28 −18
Original line number Diff line number Diff line
@@ -14,27 +14,37 @@
# limitations under the License.

import logging
from telemetry.database.TelemetryDB import TelemetryDB
from .messages import create_collector_request
from typing import Any
from telemetry.database.TelemetryDBmanager import TelemetryDBmanager
from telemetry.database.TelemetryEngine import TelemetryEngine
from telemetry.database.tests import temp_DB
from .messages import create_kpi_request, create_collector_request, \
                        create_kpi_id_request, create_kpi_filter_request

logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)

def test_telemetry_DB_connection():
    LOGGER.info('test_telemetry_DB_connection begin')
    TelemetryDBobj = TelemetryDB()
    if(TelemetryDBobj.create_database()):
        LOGGER.info('test_telemetry_DB_connection -----DB----')
        TelemetryDBobj.create_tables()   # type: ignore
        LOGGER.info('test_telemetry_DB_connection -----Table----')
        TelemetryDBobj.inser_kpi(4, 'this is test kpi')
        LOGGER.info('test_telemetry_DB_connection -----INSERT KPI----')
        TelemetryDBobj.insert_collector(4, "this is test collector", 3.0, 12.0)
        LOGGER.info('test_telemetry_DB_connection -----INSERT COL----')
        TelemetryDBobj.get_kpi(1)
        LOGGER.info('test_telemetry_DB_connection -----GET KPI----')



# def test_temp_DB():
#     temp_DB.main()

def test_telemetry_object_creation():
    LOGGER.info('--- test_telemetry_object_creation: START')
    LOGGER.info('>>> Creating TelemetryDBmanager Object: ')
    TelemetryDBmanagerObj = TelemetryDBmanager()
    # LOGGER.info('>>> Creating Tables: ')
    # TelemetryDBmanagerObj.create_tables()
    # LOGGER.info('>>> Verifing Table creation: ')
    # TelemetryDBmanagerObj.verify_tables()
    LOGGER.info('>>> Row Insertion Operation: kpi Table')
    kpi_obj = create_kpi_request()
    TelemetryDBmanagerObj.inser_kpi(kpi_obj)
    LOGGER.info('>>> Row Insertion Operation: collector Table')
    collector_obj = create_collector_request()
    TelemetryDBmanagerObj.insert_collector(collector_obj)
    LOGGER.info('>>> Get KpiDescriptor ')
    kpi_id_obj = create_kpi_id_request()
    TelemetryDBmanagerObj.get_kpi_descriptor(kpi_id_obj)
    kpi_filter : dict[str, Any] = create_kpi_filter_request()
    TelemetryDBmanagerObj.select_kpi_descriptor(**kpi_filter)

    
 No newline at end of file
Loading