Skip to content
Snippets Groups Projects
Commit 5d3979fa authored by Waleed Akbar's avatar Waleed Akbar
Browse files

ManagementDB.py is created to manage DB operation of 'KPImanager' and 'TelemetryManager'.

parent cd73d36e
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!207Resolve "(CTTC) Separation of Monitoring"
...@@ -24,5 +24,5 @@ cd $PROJECTDIR/src ...@@ -24,5 +24,5 @@ cd $PROJECTDIR/src
# python3 kpi_manager/tests/test_unitary.py # python3 kpi_manager/tests/test_unitary.py
RCFILE=$PROJECTDIR/coverage/.coveragerc RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=INFO --verbose \ python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \
telemetry/frontend/tests/test_frontend.py telemetry/frontend/tests/test_frontend.py
\ No newline at end of file
#!/bin/bash
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
# RCFILE=$PROJECTDIR/coverage/.coveragerc
# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
# kpi_manager/tests/test_unitary.py
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-cli-level=INFO --verbose \
telemetry/database/tests/managementDBtests.py
\ No newline at end of file
...@@ -13,7 +13,8 @@ ...@@ -13,7 +13,8 @@
# limitations under the License. # limitations under the License.
import logging, time import logging, time
from sqlalchemy import inspect import sqlalchemy
from sqlalchemy import inspect, MetaData, Table
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from telemetry.database.TelemetryModel import Collector as CollectorModel from telemetry.database.TelemetryModel import Collector as CollectorModel
from telemetry.database.TelemetryModel import Kpi as KpiModel from telemetry.database.TelemetryModel import Kpi as KpiModel
...@@ -22,13 +23,10 @@ from telemetry.database.TelemetryEngine import TelemetryEngine ...@@ -22,13 +23,10 @@ from telemetry.database.TelemetryEngine import TelemetryEngine
from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
from telemetry.database.TelemetryModel import Base
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
DB_NAME = "TelemetryFrontend" DB_NAME = "telemetryfrontend"
# Create a base class for declarative models
Base = declarative_base()
class TelemetryDBmanager: class TelemetryDBmanager:
def __init__(self): def __init__(self):
...@@ -41,18 +39,19 @@ class TelemetryDBmanager: ...@@ -41,18 +39,19 @@ class TelemetryDBmanager:
def create_database(self): def create_database(self):
try: try:
with self.db_engine.connect() as connection: # with self.db_engine.connect() as connection:
connection.execute(f"CREATE DATABASE {self.db_name};") # connection.execute(f"CREATE DATABASE {self.db_name};")
LOGGER.info('TelemetryDBmanager initalized DB Name: {self.db_name}') TelemetryEngine.create_database(self.db_engine)
LOGGER.info('TelemetryDBmanager initalized DB Name: {:}'.format(self.db_name))
return True return True
except: # pylint: disable=bare-except # pragma: no cover except Exception as e: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to check/create the database: {:s}'.format(str(self.db_engine.url))) LOGGER.exception('Failed to check/create the database: {:s}'.format(str(e)))
return False return False
def create_tables(self): def create_tables(self):
try: try:
Base.metadata.create_all(self.db_engine) # type: ignore Base.metadata.create_all(self.db_engine) # type: ignore
LOGGER.info("Tables created in the DB Name: {:}".format(self.db_name)) LOGGER.info("Tables created in database ({:}) the as per Models".format(self.db_name))
except Exception as e: except Exception as e:
LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e))) LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e)))
...@@ -61,10 +60,30 @@ class TelemetryDBmanager: ...@@ -61,10 +60,30 @@ class TelemetryDBmanager:
with self.db_engine.connect() as connection: with self.db_engine.connect() as connection:
result = connection.execute("SHOW TABLES;") result = connection.execute("SHOW TABLES;")
tables = result.fetchall() tables = result.fetchall()
LOGGER.info("Tables verified: {:}".format(tables)) LOGGER.info("Tables in DB: {:}".format(tables))
except Exception as e: except Exception as e:
LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) LOGGER.info("Unable to fetch Table names. {:s}".format(str(e)))
def drop_table(self, table_to_drop: str):
try:
inspector = inspect(self.db_engine)
existing_tables = inspector.get_table_names()
if table_to_drop in existing_tables:
table = Table(table_to_drop, MetaData(), autoload_with=self.db_engine)
table.drop(self.db_engine)
LOGGER.info("Tables delete in the DB Name: {:}".format(self.db_name))
else:
LOGGER.warning("No table {:} in database {:} ".format(table_to_drop, DB_NAME))
except Exception as e:
LOGGER.info("Tables cannot be deleted in the {:} database. {:s}".format(DB_NAME, str(e)))
def list_databases(self):
query = "SHOW DATABASES"
with self.db_engine.connect() as connection:
result = connection.execute(query)
databases = [row[0] for row in result]
LOGGER.info("List of available DBs: {:}".format(databases))
# ------------------ INSERT METHODs -------------------------------------- # ------------------ INSERT METHODs --------------------------------------
def inser_kpi(self, request: KpiDescriptor): def inser_kpi(self, request: KpiDescriptor):
...@@ -84,7 +103,7 @@ class TelemetryDBmanager: ...@@ -84,7 +103,7 @@ class TelemetryDBmanager:
# Add the instance to the session # Add the instance to the session
session.add(kpi_to_insert) session.add(kpi_to_insert)
session.commit() session.commit()
LOGGER.info("Row inserted into kpi table: {:}".format(kpi_to_insert)) LOGGER.info("Row inserted into kpi table: {:}".format(kpi_to_insert.kpi_id))
except Exception as e: except Exception as e:
session.rollback() session.rollback()
LOGGER.info("Failed to insert new kpi. {:s}".format(str(e))) LOGGER.info("Failed to insert new kpi. {:s}".format(str(e)))
...@@ -108,7 +127,7 @@ class TelemetryDBmanager: ...@@ -108,7 +127,7 @@ class TelemetryDBmanager:
session.add(collector_to_insert) session.add(collector_to_insert)
session.commit() session.commit()
LOGGER.info("Row inserted into collector table: {:}".format(collector_to_insert)) LOGGER.info("Row inserted into collector table: {:}".format(collector_to_insert.collector_id))
except Exception as e: except Exception as e:
session.rollback() session.rollback()
LOGGER.info("Failed to insert new collector. {:s}".format(str(e))) LOGGER.info("Failed to insert new collector. {:s}".format(str(e)))
...@@ -163,7 +182,10 @@ class TelemetryDBmanager: ...@@ -163,7 +182,10 @@ class TelemetryDBmanager:
for column, value in filters.items(): for column, value in filters.items():
query = query.filter(getattr(KpiModel, column) == value) query = query.filter(getattr(KpiModel, column) == value)
result = query.all() result = query.all()
LOGGER.info("Fetched filtered rows from KPI table with filters ---------- : {:s}".format(str(result))) if len(result) != 0:
LOGGER.info("Fetched filtered rows from KPI table with filters : {:s}".format(str(result)))
else:
LOGGER.warning("No matching row found : {:s}".format(str(result)))
return result return result
except SQLAlchemyError as e: except SQLAlchemyError as e:
LOGGER.error("Error fetching filtered rows from KPI table with filters {:}: {:}".format(filters, e)) LOGGER.error("Error fetching filtered rows from KPI table with filters {:}: {:}".format(filters, e))
...@@ -178,7 +200,10 @@ class TelemetryDBmanager: ...@@ -178,7 +200,10 @@ class TelemetryDBmanager:
for column, value in filters.items(): for column, value in filters.items():
query = query.filter(getattr(CollectorModel, column) == value) query = query.filter(getattr(CollectorModel, column) == value)
result = query.all() result = query.all()
LOGGER.info("Fetched filtered rows from KPI table with filters ---------- : {:s}".format(str(result))) if len(result) != 0:
LOGGER.info("Fetched filtered rows from KPI table with filters : {:s}".format(str(result)))
else:
LOGGER.warning("No matching row found : {:s}".format(str(result)))
return result return result
except SQLAlchemyError as e: except SQLAlchemyError as e:
LOGGER.error("Error fetching filtered rows from KPI table with filters {:}: {:}".format(filters, e)) LOGGER.error("Error fetching filtered rows from KPI table with filters {:}: {:}".format(filters, e))
...@@ -213,11 +238,11 @@ class TelemetryDBmanager: ...@@ -213,11 +238,11 @@ class TelemetryDBmanager:
if collector: if collector:
session.delete(collector) session.delete(collector)
session.commit() session.commit()
LOGGER.info("Deleted KPI with kpi_id: %s", collector_id_to_delete) LOGGER.info("Deleted collector with collector_id: %s", collector_id_to_delete)
else: else:
LOGGER.warning("KPI with kpi_id %s not found", collector_id_to_delete) LOGGER.warning("collector with collector_id %s not found", collector_id_to_delete)
except SQLAlchemyError as e: except SQLAlchemyError as e:
session.rollback() session.rollback()
LOGGER.error("Error deleting KPI with kpi_id %s: %s", collector_id_to_delete, e) LOGGER.error("Error deleting collector with collector_id %s: %s", collector_id_to_delete, e)
finally: finally:
session.close() session.close()
\ No newline at end of file
...@@ -30,7 +30,7 @@ class TelemetryEngine: ...@@ -30,7 +30,7 @@ class TelemetryEngine:
def get_engine() -> sqlalchemy.engine.Engine: def get_engine() -> sqlalchemy.engine.Engine:
CRDB_NAMESPACE = "crdb" CRDB_NAMESPACE = "crdb"
CRDB_SQL_PORT = "26257" CRDB_SQL_PORT = "26257"
CRDB_DATABASE = "TelemetryFrontend" CRDB_DATABASE = "telemetryfrontend"
CRDB_USERNAME = "tfs" CRDB_USERNAME = "tfs"
CRDB_PASSWORD = "tfs123" CRDB_PASSWORD = "tfs123"
CRDB_SSLMODE = "require" CRDB_SSLMODE = "require"
...@@ -41,8 +41,8 @@ class TelemetryEngine: ...@@ -41,8 +41,8 @@ class TelemetryEngine:
try: try:
# engine = sqlalchemy.create_engine( # engine = sqlalchemy.create_engine(
# crdb_uri, connect_args={'application_name': APP_NAME}, echo=ECHO, future=True) # crdb_uri, connect_args={'application_name': APP_NAME}, echo=ECHO, future=True)
engine = sqlalchemy.create_engine(crdb_uri) engine = sqlalchemy.create_engine(crdb_uri, echo=False)
LOGGER.info(' --- TelemetryDBmanager initalized with DB URL: {:}'.format(crdb_uri)) LOGGER.info(' TelemetryDBmanager initalized with DB URL: {:}'.format(crdb_uri))
except: # pylint: disable=bare-except # pragma: no cover except: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri)))
return None # type: ignore return None # type: ignore
......
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, time
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from telemetry.database.TelemetryEngine import TelemetryEngine
LOGGER = logging.getLogger(__name__)
TELEMETRY_DB_NAME = "telemetryfrontend"
# Create a base class for declarative models
Base = declarative_base()
class managementDB:
def __init__(self):
self.db_engine = TelemetryEngine.get_engine()
if self.db_engine is None:
LOGGER.error('Unable to get SQLAlchemy DB Engine...')
return False
self.db_name = TELEMETRY_DB_NAME
self.Session = sessionmaker(bind=self.db_engine)
def create_database(self):
try:
with self.db_engine.connect() as connection:
connection.execute(f"CREATE DATABASE {self.db_name};")
LOGGER.info('managementDB initalizes database. Name: {self.db_name}')
return True
except:
LOGGER.exception('Failed to check/create the database: {:s}'.format(str(self.db_engine.url)))
return False
def create_tables(self):
try:
Base.metadata.create_all(self.db_engine) # type: ignore
LOGGER.info("Tables created in the DB Name: {:}".format(self.db_name))
except Exception as e:
LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e)))
def verify_tables(self):
try:
with self.db_engine.connect() as connection:
result = connection.execute("SHOW TABLES;")
tables = result.fetchall() # type: ignore
LOGGER.info("Tables verified: {:}".format(tables))
except Exception as e:
LOGGER.info("Unable to fetch Table names. {:s}".format(str(e)))
def add_row_to_db(self, row):
session = self.Session()
try:
session.add(row)
session.commit()
LOGGER.info(f"Row inserted into {row.__class__.__name__} table. {row.__class__.__name__} Id: : {row.collector_id}")
except Exception as e:
session.rollback()
LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}")
finally:
session.close()
\ No newline at end of file
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from telemetry.database.managementDB import managementDB
from telemetry.database.tests.messages import create_collector_model_object
def test_add_row_to_db():
managementDBobj = managementDB()
managementDBobj.add_row_to_db(create_collector_model_object())
\ No newline at end of file
...@@ -12,17 +12,19 @@ ...@@ -12,17 +12,19 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import time
import uuid import uuid
import random import random
from common.proto import telemetry_frontend_pb2 from common.proto import telemetry_frontend_pb2
from common.proto import kpi_manager_pb2 from common.proto import kpi_manager_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.kpi_sample_types_pb2 import KpiSampleType
from telemetry.database.TelemetryModel import Collector as CollectorModel
def create_collector_request(): def create_collector_request():
_create_collector_request = telemetry_frontend_pb2.Collector() _create_collector_request = telemetry_frontend_pb2.Collector()
_create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4())
_create_collector_request.kpi_id.kpi_id.uuid = '2a779f04-77a6-4b32-b020-893e0e1e656f' # must be primary key in kpi table _create_collector_request.kpi_id.kpi_id.uuid = '71d58648-bf47-49ac-996f-e63a9fbfead4' # must be primary key in kpi table
# _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) # _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_collector_request.duration_s = float(random.randint(8, 16)) _create_collector_request.duration_s = float(random.randint(8, 16))
_create_collector_request.interval_s = float(random.randint(2, 4)) _create_collector_request.interval_s = float(random.randint(2, 4))
...@@ -43,19 +45,19 @@ def create_kpi_request(): ...@@ -43,19 +45,19 @@ def create_kpi_request():
def create_kpi_id_request(): def create_kpi_id_request():
_create_kpi_id_request = kpi_manager_pb2.KpiId() _create_kpi_id_request = kpi_manager_pb2.KpiId()
_create_kpi_id_request.kpi_id.uuid = '11e2c6c6-b507-40aa-ab3a-ffd41e7125f0' _create_kpi_id_request.kpi_id.uuid = '71d58648-bf47-49ac-996f-e63a9fbfead4'
return _create_kpi_id_request return _create_kpi_id_request
def create_collector_id_request(): def create_collector_id_request():
_create_collector_id_request = telemetry_frontend_pb2.CollectorId() _create_collector_id_request = telemetry_frontend_pb2.CollectorId()
_create_collector_id_request.collector_id.uuid = '50ba9199-7e9d-45b5-a2fc-3f97917bad65' _create_collector_id_request.collector_id.uuid = '71d58648-bf47-49ac-996f-e63a9fbfead4'
return _create_collector_id_request return _create_collector_id_request
def create_kpi_filter_request(): def create_kpi_filter_request():
# create a dict as follows: 'Key' = 'KpiModel' column name and 'Value' = filter to apply. # create a dict as follows: 'Key' = 'KpiModel' column name and 'Value' = filter to apply.
_create_kpi_filter_request = dict() _create_kpi_filter_request = dict()
_create_kpi_filter_request['kpi_sample_type'] = 102 _create_kpi_filter_request['kpi_sample_type'] = 102
_create_kpi_filter_request['kpi_id'] = '11e2c6c6-b507-40aa-ab3a-ffd41e7125f0' _create_kpi_filter_request['kpi_id'] = '3a17230d-8e95-4afb-8b21-6965481aee5a'
return _create_kpi_filter_request return _create_kpi_filter_request
def create_collector_filter_request(): def create_collector_filter_request():
...@@ -64,3 +66,15 @@ def create_collector_filter_request(): ...@@ -64,3 +66,15 @@ def create_collector_filter_request():
_create_kpi_filter_request['sampling_interval_s'] = 3.0 _create_kpi_filter_request['sampling_interval_s'] = 3.0
# _create_kpi_filter_request['kpi_id'] = '11e2c6c6-b507-40aa-ab3a-ffd41e7125f0' # _create_kpi_filter_request['kpi_id'] = '11e2c6c6-b507-40aa-ab3a-ffd41e7125f0'
return _create_kpi_filter_request return _create_kpi_filter_request
def create_collector_model_object():
# Create a new Collector instance
collector_to_insert = CollectorModel()
collector_to_insert.collector_id = str(uuid.uuid4())
collector_to_insert.kpi_id = '3a17230d-8e95-4afb-8b21-6965481aee5a'
collector_to_insert.collector = "Test collector description"
collector_to_insert.sampling_duration_s = 15
collector_to_insert.sampling_interval_s = 3
collector_to_insert.start_timestamp = time.time()
collector_to_insert.end_timestamp = time.time()
return collector_to_insert
\ No newline at end of file
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
import logging import logging
from typing import Any from typing import Any
from sqlalchemy.ext.declarative import declarative_base
from telemetry.database.TelemetryDBmanager import TelemetryDBmanager from telemetry.database.TelemetryDBmanager import TelemetryDBmanager
from telemetry.database.TelemetryEngine import TelemetryEngine from telemetry.database.TelemetryEngine import TelemetryEngine
from telemetry.database.tests import temp_DB from telemetry.database.tests import temp_DB
...@@ -25,42 +26,52 @@ from .messages import create_kpi_request, create_collector_request, \ ...@@ -25,42 +26,52 @@ from .messages import create_kpi_request, create_collector_request, \
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
# def test_temp_DB(): # def test_temp_DB():
# temp_DB.main() # temp_DB.main()
def test_telemetry_object_creation(): def test_telemetry_object_creation():
LOGGER.info('--- test_telemetry_object_creation: START') LOGGER.info('--- test_telemetry_object_creation: START')
LOGGER.info('>>> Creating TelemetryDBmanager Object: ') LOGGER.info('>>> Creating TelemetryDBmanager Object <<< ')
TelemetryDBmanagerObj = TelemetryDBmanager() TelemetryDBmanagerObj = TelemetryDBmanager()
# LOGGER.info('>>> Creating Tables: ') # LOGGER.info('>>> Creating database <<< ')
# TelemetryDBmanagerObj.create_database()
# LOGGER.info('>>> verifing database <<< ')
# TelemetryDBmanagerObj.list_databases()
# # LOGGER.info('>>> Droping Tables: ')
# # TelemetryDBmanagerObj.drop_table("table_naem_here")
# LOGGER.info('>>> Creating Tables <<< ')
# TelemetryDBmanagerObj.create_tables() # TelemetryDBmanagerObj.create_tables()
# LOGGER.info('>>> Verifing Table creation: ') LOGGER.info('>>> Verifing Table creation <<< ')
# TelemetryDBmanagerObj.verify_tables() TelemetryDBmanagerObj.verify_tables()
LOGGER.info('>>> TESTING: Row Insertion Operation: kpi Table') LOGGER.info('>>> TESTING: Row Insertion Operation: kpi Table <<<')
kpi_obj = create_kpi_request() kpi_obj = create_kpi_request()
TelemetryDBmanagerObj.inser_kpi(kpi_obj) TelemetryDBmanagerObj.inser_kpi(kpi_obj)
LOGGER.info('>>> TESTING: Row Insertion Operation: collector Table') LOGGER.info('>>> TESTING: Row Insertion Operation: collector Table <<<')
collector_obj = create_collector_request() collector_obj = create_collector_request()
TelemetryDBmanagerObj.insert_collector(collector_obj) TelemetryDBmanagerObj.insert_collector(collector_obj)
LOGGER.info('>>> TESTING: Get KpiDescriptor ') LOGGER.info('>>> TESTING: Get KpiDescriptor <<<')
kpi_id_obj = create_kpi_id_request() kpi_id_obj = create_kpi_id_request()
TelemetryDBmanagerObj.get_kpi_descriptor(kpi_id_obj) TelemetryDBmanagerObj.get_kpi_descriptor(kpi_id_obj)
LOGGER.info('>>> TESTING: Select Collector ') LOGGER.info('>>> TESTING: Select Collector <<<')
collector_id_obj = create_collector_id_request() collector_id_obj = create_collector_id_request()
TelemetryDBmanagerObj.get_collector(collector_id_obj) TelemetryDBmanagerObj.get_collector(collector_id_obj)
LOGGER.info('>>> TESTING: Applying kpi filter ') LOGGER.info('>>> TESTING: Applying kpi filter <<< ')
kpi_filter : dict[str, Any] = create_kpi_filter_request() kpi_filter : dict[str, Any] = create_kpi_filter_request()
TelemetryDBmanagerObj.select_kpi_descriptor(**kpi_filter) TelemetryDBmanagerObj.select_kpi_descriptor(**kpi_filter)
LOGGER.info('>>> TESTING: Applying collector filter ') LOGGER.info('>>> TESTING: Applying collector filter <<<')
collector_filter : dict[str, Any] = create_collector_filter_request() collector_filter : dict[str, Any] = create_collector_filter_request()
TelemetryDBmanagerObj.select_collector(**collector_filter) TelemetryDBmanagerObj.select_collector(**collector_filter)
......
...@@ -243,7 +243,7 @@ class DatabaseManager: ...@@ -243,7 +243,7 @@ class DatabaseManager:
# Example Usage # Example Usage
def main(): def main():
CRDB_SQL_PORT = "26257" CRDB_SQL_PORT = "26257"
CRDB_DATABASE = "TelemetryFrontend" CRDB_DATABASE = "telemetryfrontend"
CRDB_USERNAME = "tfs" CRDB_USERNAME = "tfs"
CRDB_PASSWORD = "tfs123" CRDB_PASSWORD = "tfs123"
CRDB_SSLMODE = "require" CRDB_SSLMODE = "require"
...@@ -255,7 +255,7 @@ def main(): ...@@ -255,7 +255,7 @@ def main():
db_manager = DatabaseManager(crdb_uri, CRDB_DATABASE) db_manager = DatabaseManager(crdb_uri, CRDB_DATABASE)
# Create database # Create database
# db_manager.create_database() db_manager.create_database()
# Update db_url to include the new database name # Update db_url to include the new database name
db_manager.engine = create_engine(f"{crdb_uri}") db_manager.engine = create_engine(f"{crdb_uri}")
......
...@@ -29,6 +29,8 @@ from common.proto.telemetry_frontend_pb2 import CollectorId, Collector, Collecto ...@@ -29,6 +29,8 @@ from common.proto.telemetry_frontend_pb2 import CollectorId, Collector, Collecto
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceServicer from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceServicer
from telemetry.database.TelemetryModel import Collector as CollectorModel
from telemetry.database.managementDB import managementDB
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Monitoring', 'TelemetryFrontend') METRICS_POOL = MetricsPool('Monitoring', 'TelemetryFrontend')
...@@ -41,6 +43,23 @@ KAFKA_TOPICS = {'request' : 'topic_request', ...@@ -41,6 +43,23 @@ KAFKA_TOPICS = {'request' : 'topic_request',
class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
def __init__(self, name_mapping : NameMapping): def __init__(self, name_mapping : NameMapping):
LOGGER.info('Init TelemetryFrontendService') LOGGER.info('Init TelemetryFrontendService')
self.managementDBobj = managementDB()
def add_collector_to_db(self, request: Collector ):
try:
# Create a new Collector instance
collector_to_insert = CollectorModel()
collector_to_insert.collector_id = request.collector_id.collector_id.uuid
collector_to_insert.kpi_id = '3a17230d-8e95-4afb-8b21-6965481aee5a'
collector_to_insert.collector = "Test collector description"
collector_to_insert.sampling_duration_s = request.duration_s
collector_to_insert.sampling_interval_s = request.interval_s
collector_to_insert.start_timestamp = time.time()
collector_to_insert.end_timestamp = time.time()
self.managementDBobj.add_row_to_db(collector_to_insert)
except Exception as e:
LOGGER.info("Unable to create collectorModel class object. {:}".format(e))
# @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StartCollector(self, def StartCollector(self,
...@@ -52,6 +71,8 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): ...@@ -52,6 +71,8 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
_collector_kpi_id = str(request.kpi_id.kpi_id.uuid) _collector_kpi_id = str(request.kpi_id.kpi_id.uuid)
_collector_duration = int(request.duration_s) _collector_duration = int(request.duration_s)
_collector_interval = int(request.interval_s) _collector_interval = int(request.interval_s)
# pushing Collector to DB
self.add_collector_to_db(request)
self.generate_kafka_request(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval) self.generate_kafka_request(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval)
# self.run_generate_kafka_request(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval) # self.run_generate_kafka_request(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval)
response.collector_id.uuid = request.collector_id.collector_id.uuid # type: ignore response.collector_id.uuid = request.collector_id.collector_id.uuid # type: ignore
...@@ -74,10 +95,11 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): ...@@ -74,10 +95,11 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
# topic_request = "topic_request" # topic_request = "topic_request"
msg_value = Tuple [str, int, int] msg_value = Tuple [str, int, int]
msg_value = (kpi, duration, interval) msg_value = (kpi, duration, interval)
print ("Request generated: ", "Colletcor Id: ", msg_key, \ # print ("Request generated: ", "Colletcor Id: ", msg_key, \
", \nKPI: ", kpi, ", Duration: ", duration, ", Interval: ", interval) # ", \nKPI: ", kpi, ", Duration: ", duration, ", Interval: ", interval)
producerObj = KafkaProducer(producer_configs) producerObj = KafkaProducer(producer_configs)
producerObj.produce(KAFKA_TOPICS['request'], key=msg_key, value= str(msg_value), callback=self.delivery_callback) producerObj.produce(KAFKA_TOPICS['request'], key=msg_key, value= str(msg_value), callback=self.delivery_callback)
LOGGER.info("Collector Request Generated: {:} -- {:} -- {:} -- {:}".format(msg_key, kpi, duration, interval))
# producerObj.produce(topic_request, key=msg_key, value= str(msg_value), callback=self.delivery_callback) # producerObj.produce(topic_request, key=msg_key, value= str(msg_value), callback=self.delivery_callback)
ACTIVE_COLLECTORS.append(msg_key) ACTIVE_COLLECTORS.append(msg_key)
producerObj.flush() producerObj.flush()
......
...@@ -168,30 +168,33 @@ def telemetryFrontend_client( ...@@ -168,30 +168,33 @@ def telemetryFrontend_client(
########################### ###########################
def test_start_collector(telemetryFrontend_client): def test_start_collector(telemetryFrontend_client):
LOGGER.warning('test_start_collector requesting') LOGGER.info('test_start_collector requesting')
response = telemetryFrontend_client.StartCollector(create_collector_request()) response = telemetryFrontend_client.StartCollector(create_collector_request())
LOGGER.debug(str(response)) LOGGER.debug(str(response))
assert isinstance(response, CollectorId) assert isinstance(response, CollectorId)
def test_start_collector_a(telemetryFrontend_client): # def test_start_collector_a(telemetryFrontend_client):
LOGGER.warning('test_start_collector requesting') # LOGGER.warning('test_start_collector requesting')
response = telemetryFrontend_client.StartCollector(create_collector_request()) # response = telemetryFrontend_client.StartCollector(create_collector_request())
LOGGER.debug(str(response)) # LOGGER.debug(str(response))
assert isinstance(response, CollectorId) # assert isinstance(response, CollectorId)
# def test_start_collector_b(telemetryFrontend_client):
# LOGGER.warning('test_start_collector requesting')
# response = telemetryFrontend_client.StartCollector(create_collector_request())
# LOGGER.debug(str(response))
# assert isinstance(response, CollectorId)
# def test_run_kafka_listener():
# LOGGER.warning('test_receive_kafka_request requesting')
# name_mapping = NameMapping()
# TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl(name_mapping)
# response = TelemetryFrontendServiceObj.run_kafka_listener() # Method "run_kafka_listener" is not define in frontend.proto
# LOGGER.debug(str(response))
# assert isinstance(response, bool)
def test_start_collector_b(telemetryFrontend_client):
LOGGER.warning('test_start_collector requesting')
response = telemetryFrontend_client.StartCollector(create_collector_request())
LOGGER.debug(str(response))
assert isinstance(response, CollectorId)
def test_run_kafka_listener():
LOGGER.warning('test_receive_kafka_request requesting')
name_mapping = NameMapping()
TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl(name_mapping)
response = TelemetryFrontendServiceObj.run_kafka_listener() # Method "run_kafka_listener" is not define in frontend.proto
LOGGER.debug(str(response))
assert isinstance(response, bool)
# def test_stop_collector(telemetryFrontend_client): # def test_stop_collector(telemetryFrontend_client):
# LOGGER.warning('test_stop_collector requesting') # LOGGER.warning('test_stop_collector requesting')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment