Commit f2c91300 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Telemetry crDB creation, basic operation and test

parent 8ce11c89
Loading
Loading
Loading
Loading
+26 −0
Original line number Diff line number Diff line
#!/bin/bash
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


PROJECTDIR=`pwd`

cd $PROJECTDIR/src
# RCFILE=$PROJECTDIR/coverage/.coveragerc
# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
#     kpi_manager/tests/test_unitary.py

RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-cli-level=INFO --verbose \
    telemetry/database/tests/telemetryDBtests.py
 No newline at end of file
+27 −0
Original line number Diff line number Diff line
#!/bin/bash
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

########################################################################################################################
# Define your deployment settings here
########################################################################################################################

# If not already set, set the name of the Kubernetes namespace to deploy to.
export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"crdb"}

########################################################################################################################
# Automated steps start here
########################################################################################################################

kubectl --namespace $TFS_K8S_NAMESPACE logs cockroachdb-0
+122 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, time
from sqlalchemy import engine
from sqlalchemy.orm import sessionmaker
from telemetry.database.TelemetryModel import Collector as CollectorModel
from telemetry.database.TelemetryModel import Kpi as KpiModel
from sqlalchemy.ext.declarative import declarative_base
from telemetry.database.TelemetryEngine import TelemetryEngine

LOGGER = logging.getLogger(__name__)

# Create a base class for declarative models
Base = declarative_base()

class TelemetryDB:
    def __init__(self):
        self.db_engine = TelemetryEngine.get_engine()
        if self.db_engine is None:
            LOGGER.error('Unable to get SQLAlchemy DB Engine...')
            return False
        LOGGER.info('test_telemetry_DB_connection -- Engine created sucessfully')

    def create_database(self):
        try:
            TelemetryEngine.create_database(self.db_engine)
            LOGGER.info('test_telemetry_DB_connection -- DB created sucessfully')
            return True
        except: # pylint: disable=bare-except # pragma: no cover
            LOGGER.exception('Failed to check/create the database: {:s}'.format(str(self.db_engine.url)))
            return False
        
    # Function to create the collector and KPI tables in the database
    def create_tables(self):
        try:
            Base.metadata.create_all(self.db_engine)     # type: ignore
            LOGGER.info("Collector and KPI tables created in the TelemetryFrontend database")
        except Exception as e:
            LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e)))
                
    # Function to insert a row into the Collector model
    def insert_collector(self, kpi_id: int, collector: str, duration_s: float, interval_s: float):
        # Create a session
        Session = sessionmaker(bind=self.db_engine)
        session = Session()
        try:
            # Create a new Collector instance
            collectorObj                     = CollectorModel()
            collectorObj.kpi_id              = kpi_id
            collectorObj.collector           = collector
            collectorObj.sampling_duration_s = duration_s
            collectorObj.sampling_interval_s = interval_s
            collectorObj.start_timestamp     = time.time()
            collectorObj.end_timestamp       = time.time()
            
            # Add the instance to the session
            session.add(collectorObj)
            
            # Commit the session
            session.commit()
            LOGGER.info("New collector inserted successfully")
        except Exception as e:
            session.rollback()
            LOGGER.info("Failed to insert new collector. {:s}".format(str(e)))
        finally:
            # Close the session
            session.close()
    
    def inser_kpi(self, kpi_id, kpi_descriptor):
        # Create a session
        Session = sessionmaker(bind=self.db_engine)
        session = Session()
        try:
            # Create a new Collector instance
            KpiObj                 = KpiModel()
            KpiObj.kpi_id          = kpi_id
            KpiObj.kpi_description = kpi_descriptor
            
            # Add the instance to the session
            session.add(KpiObj)
            
            # Commit the session
            session.commit()
            LOGGER.info("New collector inserted successfully")
        except Exception as e:
            session.rollback()
            LOGGER.info("Failed to insert new collector. {:s}".format(str(e)))
        finally:
            # Close the session
            session.close()

    def get_kpi(self, kpi_id):
        # Create a session
        Session = sessionmaker(bind=self.db_engine)
        session = Session()
        try:
            kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id).first()
            
            if kpi:
                LOGGER.info("kpi ID found: {:s}".format(str(kpi)))
                return kpi
            else:
                LOGGER.info("Kpi ID not found")
                return None
        except Exception as e:
            LOGGER.info("Failed to retrieve KPI ID. {:s}".format(str(e)))
            raise
        finally:
            # Close the session
            session.close()
 No newline at end of file
+57 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, sqlalchemy, sqlalchemy_utils
# from common.Settings import get_setting

LOGGER = logging.getLogger(__name__)

APP_NAME = 'tfs'
ECHO = False                # False: No dump SQL commands and transactions executed
CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@127.0.0.1:{:s}/{:s}?sslmode={:s}'
# CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}'
# CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}'

class TelemetryEngine:
    # def __init__(self):
    #     self.engine = self.get_engine()
    @staticmethod
    def get_engine() -> sqlalchemy.engine.Engine:
        CRDB_NAMESPACE = "crdb"
        CRDB_SQL_PORT  = "26257"
        CRDB_DATABASE  = "TelemetryFrontend"
        CRDB_USERNAME  = "tfs"
        CRDB_PASSWORD  = "tfs123"
        CRDB_SSLMODE   = "require"
        crdb_uri = CRDB_URI_TEMPLATE.format(
                CRDB_USERNAME, CRDB_PASSWORD, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
        # crdb_uri = CRDB_URI_TEMPLATE.format(
        #         CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
        try:
            engine = sqlalchemy.create_engine(
                crdb_uri, connect_args={'application_name': APP_NAME}, echo=ECHO, future=True)
        except: # pylint: disable=bare-except # pragma: no cover
            LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri)))
            return None # type: ignore
        return engine # type: ignore

    @staticmethod
    def create_database(engine : sqlalchemy.engine.Engine) -> None:
        if not sqlalchemy_utils.database_exists(engine.url):
            sqlalchemy_utils.create_database(engine.url)

    @staticmethod
    def drop_database(engine : sqlalchemy.engine.Engine) -> None:
        if sqlalchemy_utils.database_exists(engine.url):
            sqlalchemy_utils.drop_database(engine.url)
+59 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from sqlalchemy import Column, Integer, String, Float, Text, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship


logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)

# Create a base class for declarative models
Base = declarative_base()

class Kpi(Base):
    __tablename__ = 'KPI'

    kpi_id = Column(Integer, primary_key=True, autoincrement=True)
    kpi_description = Column(Text)
    kpi_sample_type = Column(Integer)
    device_id = Column(String)
    endpoint_id = Column(String)
    service_id = Column(String)
    slice_id = Column(String)
    connection_id = Column(String)
    link_id = Column(String)
    monitor_flag = Column(String)

    # Relationship to Collector model: allows access to related Collector objects from a Kpi object
    collectors = relationship('Collector', back_populates='kpi')

class Collector(Base):
    __tablename__ = 'collector'

    collector_id = Column(Integer, primary_key=True, autoincrement=True)
    kpi_id = Column(Integer, ForeignKey('KPI.kpi_id'))
    collector = Column(String)
    sampling_duration_s = Column(Float)
    sampling_interval_s = Column(Float)
    start_timestamp = Column(Float)
    end_timestamp = Column(Float)

    # Relationship to Kpi model: allows access to the related Kpi object from a Collector object
    kpi = relationship('Kpi', back_populates='collectors')


Loading