From 5d3979fa1fec61e3f0851b6fe55cfbe3e5379101 Mon Sep 17 00:00:00 2001 From: Waleed Akbar <wakbar@cttc.es> Date: Wed, 29 May 2024 08:23:51 +0000 Subject: [PATCH] ManagementDB.py is created to manage DB operation of 'KPImanager' and 'TelemetryManager'. --- .../run_tests_locally-telemetry-frontend.sh | 2 +- scripts/run_tests_locally-telemetry-mgtDB.sh | 26 +++++++ src/telemetry/database/TelemetryDBmanager.py | 67 +++++++++++------ src/telemetry/database/TelemetryEngine.py | 6 +- src/telemetry/database/managementDB.py | 72 +++++++++++++++++++ .../database/tests/managementDBtests.py | 22 ++++++ src/telemetry/database/tests/messages.py | 24 +++++-- .../database/tests/telemetryDBtests.py | 31 +++++--- src/telemetry/database/tests/temp_DB.py | 4 +- .../TelemetryFrontendServiceServicerImpl.py | 26 ++++++- src/telemetry/frontend/tests/test_frontend.py | 39 +++++----- 11 files changed, 257 insertions(+), 62 deletions(-) create mode 100755 scripts/run_tests_locally-telemetry-mgtDB.sh create mode 100644 src/telemetry/database/managementDB.py create mode 100644 src/telemetry/database/tests/managementDBtests.py diff --git a/scripts/run_tests_locally-telemetry-frontend.sh b/scripts/run_tests_locally-telemetry-frontend.sh index c6ab54a34..673104af6 100755 --- a/scripts/run_tests_locally-telemetry-frontend.sh +++ b/scripts/run_tests_locally-telemetry-frontend.sh @@ -24,5 +24,5 @@ cd $PROJECTDIR/src # python3 kpi_manager/tests/test_unitary.py RCFILE=$PROJECTDIR/coverage/.coveragerc -python3 -m pytest --log-level=INFO --verbose \ +python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \ telemetry/frontend/tests/test_frontend.py \ No newline at end of file diff --git a/scripts/run_tests_locally-telemetry-mgtDB.sh b/scripts/run_tests_locally-telemetry-mgtDB.sh new file mode 100755 index 000000000..02a449abf --- /dev/null +++ b/scripts/run_tests_locally-telemetry-mgtDB.sh @@ -0,0 +1,26 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src +# RCFILE=$PROJECTDIR/coverage/.coveragerc +# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ +# kpi_manager/tests/test_unitary.py + +RCFILE=$PROJECTDIR/coverage/.coveragerc +python3 -m pytest --log-cli-level=INFO --verbose \ + telemetry/database/tests/managementDBtests.py \ No newline at end of file diff --git a/src/telemetry/database/TelemetryDBmanager.py b/src/telemetry/database/TelemetryDBmanager.py index 0380bc8ee..6dc2868a1 100644 --- a/src/telemetry/database/TelemetryDBmanager.py +++ b/src/telemetry/database/TelemetryDBmanager.py @@ -13,7 +13,8 @@ # limitations under the License. import logging, time -from sqlalchemy import inspect +import sqlalchemy +from sqlalchemy import inspect, MetaData, Table from sqlalchemy.orm import sessionmaker from telemetry.database.TelemetryModel import Collector as CollectorModel from telemetry.database.TelemetryModel import Kpi as KpiModel @@ -22,13 +23,10 @@ from telemetry.database.TelemetryEngine import TelemetryEngine from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId from common.proto.telemetry_frontend_pb2 import Collector, CollectorId from sqlalchemy.exc import SQLAlchemyError - +from telemetry.database.TelemetryModel import Base LOGGER = logging.getLogger(__name__) -DB_NAME = "TelemetryFrontend" - -# Create a base class for declarative models -Base = declarative_base() +DB_NAME = "telemetryfrontend" class TelemetryDBmanager: def __init__(self): @@ -41,18 +39,19 @@ class TelemetryDBmanager: def create_database(self): try: - with self.db_engine.connect() as connection: - connection.execute(f"CREATE DATABASE {self.db_name};") - LOGGER.info('TelemetryDBmanager initalized DB Name: {self.db_name}') + # with self.db_engine.connect() as connection: + # connection.execute(f"CREATE DATABASE {self.db_name};") + TelemetryEngine.create_database(self.db_engine) + LOGGER.info('TelemetryDBmanager initalized DB Name: {:}'.format(self.db_name)) return True - except: # pylint: disable=bare-except # pragma: no cover - LOGGER.exception('Failed to check/create the database: {:s}'.format(str(self.db_engine.url))) + except Exception as e: # pylint: disable=bare-except # pragma: no cover + LOGGER.exception('Failed to check/create the database: {:s}'.format(str(e))) return False def create_tables(self): try: Base.metadata.create_all(self.db_engine) # type: ignore - LOGGER.info("Tables created in the DB Name: {:}".format(self.db_name)) + LOGGER.info("Tables created in database ({:}) the as per Models".format(self.db_name)) except Exception as e: LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e))) @@ -61,10 +60,30 @@ class TelemetryDBmanager: with self.db_engine.connect() as connection: result = connection.execute("SHOW TABLES;") tables = result.fetchall() - LOGGER.info("Tables verified: {:}".format(tables)) + LOGGER.info("Tables in DB: {:}".format(tables)) except Exception as e: LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) + def drop_table(self, table_to_drop: str): + try: + inspector = inspect(self.db_engine) + existing_tables = inspector.get_table_names() + if table_to_drop in existing_tables: + table = Table(table_to_drop, MetaData(), autoload_with=self.db_engine) + table.drop(self.db_engine) + LOGGER.info("Tables delete in the DB Name: {:}".format(self.db_name)) + else: + LOGGER.warning("No table {:} in database {:} ".format(table_to_drop, DB_NAME)) + except Exception as e: + LOGGER.info("Tables cannot be deleted in the {:} database. {:s}".format(DB_NAME, str(e))) + + def list_databases(self): + query = "SHOW DATABASES" + with self.db_engine.connect() as connection: + result = connection.execute(query) + databases = [row[0] for row in result] + LOGGER.info("List of available DBs: {:}".format(databases)) + # ------------------ INSERT METHODs -------------------------------------- def inser_kpi(self, request: KpiDescriptor): @@ -84,7 +103,7 @@ class TelemetryDBmanager: # Add the instance to the session session.add(kpi_to_insert) session.commit() - LOGGER.info("Row inserted into kpi table: {:}".format(kpi_to_insert)) + LOGGER.info("Row inserted into kpi table: {:}".format(kpi_to_insert.kpi_id)) except Exception as e: session.rollback() LOGGER.info("Failed to insert new kpi. {:s}".format(str(e))) @@ -108,7 +127,7 @@ class TelemetryDBmanager: session.add(collector_to_insert) session.commit() - LOGGER.info("Row inserted into collector table: {:}".format(collector_to_insert)) + LOGGER.info("Row inserted into collector table: {:}".format(collector_to_insert.collector_id)) except Exception as e: session.rollback() LOGGER.info("Failed to insert new collector. {:s}".format(str(e))) @@ -127,7 +146,7 @@ class TelemetryDBmanager: LOGGER.info("kpi ID found: {:s}".format(str(kpi))) return kpi else: - LOGGER.warning("Kpi ID not found{:s}".format(str(kpi_id_to_search))) + LOGGER.warning("Kpi ID not found {:s}".format(str(kpi_id_to_search))) return None except Exception as e: session.rollback() @@ -163,7 +182,10 @@ class TelemetryDBmanager: for column, value in filters.items(): query = query.filter(getattr(KpiModel, column) == value) result = query.all() - LOGGER.info("Fetched filtered rows from KPI table with filters ---------- : {:s}".format(str(result))) + if len(result) != 0: + LOGGER.info("Fetched filtered rows from KPI table with filters : {:s}".format(str(result))) + else: + LOGGER.warning("No matching row found : {:s}".format(str(result))) return result except SQLAlchemyError as e: LOGGER.error("Error fetching filtered rows from KPI table with filters {:}: {:}".format(filters, e)) @@ -178,7 +200,10 @@ class TelemetryDBmanager: for column, value in filters.items(): query = query.filter(getattr(CollectorModel, column) == value) result = query.all() - LOGGER.info("Fetched filtered rows from KPI table with filters ---------- : {:s}".format(str(result))) + if len(result) != 0: + LOGGER.info("Fetched filtered rows from KPI table with filters : {:s}".format(str(result))) + else: + LOGGER.warning("No matching row found : {:s}".format(str(result))) return result except SQLAlchemyError as e: LOGGER.error("Error fetching filtered rows from KPI table with filters {:}: {:}".format(filters, e)) @@ -213,11 +238,11 @@ class TelemetryDBmanager: if collector: session.delete(collector) session.commit() - LOGGER.info("Deleted KPI with kpi_id: %s", collector_id_to_delete) + LOGGER.info("Deleted collector with collector_id: %s", collector_id_to_delete) else: - LOGGER.warning("KPI with kpi_id %s not found", collector_id_to_delete) + LOGGER.warning("collector with collector_id %s not found", collector_id_to_delete) except SQLAlchemyError as e: session.rollback() - LOGGER.error("Error deleting KPI with kpi_id %s: %s", collector_id_to_delete, e) + LOGGER.error("Error deleting collector with collector_id %s: %s", collector_id_to_delete, e) finally: session.close() \ No newline at end of file diff --git a/src/telemetry/database/TelemetryEngine.py b/src/telemetry/database/TelemetryEngine.py index d6e54cc2f..ebeaf3787 100644 --- a/src/telemetry/database/TelemetryEngine.py +++ b/src/telemetry/database/TelemetryEngine.py @@ -30,7 +30,7 @@ class TelemetryEngine: def get_engine() -> sqlalchemy.engine.Engine: CRDB_NAMESPACE = "crdb" CRDB_SQL_PORT = "26257" - CRDB_DATABASE = "TelemetryFrontend" + CRDB_DATABASE = "telemetryfrontend" CRDB_USERNAME = "tfs" CRDB_PASSWORD = "tfs123" CRDB_SSLMODE = "require" @@ -41,8 +41,8 @@ class TelemetryEngine: try: # engine = sqlalchemy.create_engine( # crdb_uri, connect_args={'application_name': APP_NAME}, echo=ECHO, future=True) - engine = sqlalchemy.create_engine(crdb_uri) - LOGGER.info(' --- TelemetryDBmanager initalized with DB URL: {:}'.format(crdb_uri)) + engine = sqlalchemy.create_engine(crdb_uri, echo=False) + LOGGER.info(' TelemetryDBmanager initalized with DB URL: {:}'.format(crdb_uri)) except: # pylint: disable=bare-except # pragma: no cover LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) return None # type: ignore diff --git a/src/telemetry/database/managementDB.py b/src/telemetry/database/managementDB.py new file mode 100644 index 000000000..f8d0ef9cb --- /dev/null +++ b/src/telemetry/database/managementDB.py @@ -0,0 +1,72 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, time +from sqlalchemy.orm import sessionmaker +from sqlalchemy.ext.declarative import declarative_base +from telemetry.database.TelemetryEngine import TelemetryEngine + + +LOGGER = logging.getLogger(__name__) +TELEMETRY_DB_NAME = "telemetryfrontend" + +# Create a base class for declarative models +Base = declarative_base() + +class managementDB: + def __init__(self): + self.db_engine = TelemetryEngine.get_engine() + if self.db_engine is None: + LOGGER.error('Unable to get SQLAlchemy DB Engine...') + return False + self.db_name = TELEMETRY_DB_NAME + self.Session = sessionmaker(bind=self.db_engine) + + def create_database(self): + try: + with self.db_engine.connect() as connection: + connection.execute(f"CREATE DATABASE {self.db_name};") + LOGGER.info('managementDB initalizes database. Name: {self.db_name}') + return True + except: + LOGGER.exception('Failed to check/create the database: {:s}'.format(str(self.db_engine.url))) + return False + + def create_tables(self): + try: + Base.metadata.create_all(self.db_engine) # type: ignore + LOGGER.info("Tables created in the DB Name: {:}".format(self.db_name)) + except Exception as e: + LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e))) + + def verify_tables(self): + try: + with self.db_engine.connect() as connection: + result = connection.execute("SHOW TABLES;") + tables = result.fetchall() # type: ignore + LOGGER.info("Tables verified: {:}".format(tables)) + except Exception as e: + LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) + + def add_row_to_db(self, row): + session = self.Session() + try: + session.add(row) + session.commit() + LOGGER.info(f"Row inserted into {row.__class__.__name__} table. {row.__class__.__name__} Id: : {row.collector_id}") + except Exception as e: + session.rollback() + LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}") + finally: + session.close() \ No newline at end of file diff --git a/src/telemetry/database/tests/managementDBtests.py b/src/telemetry/database/tests/managementDBtests.py new file mode 100644 index 000000000..3d7ef6615 --- /dev/null +++ b/src/telemetry/database/tests/managementDBtests.py @@ -0,0 +1,22 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from telemetry.database.managementDB import managementDB +from telemetry.database.tests.messages import create_collector_model_object + + +def test_add_row_to_db(): + managementDBobj = managementDB() + managementDBobj.add_row_to_db(create_collector_model_object()) \ No newline at end of file diff --git a/src/telemetry/database/tests/messages.py b/src/telemetry/database/tests/messages.py index 258d4a844..6452e79e7 100644 --- a/src/telemetry/database/tests/messages.py +++ b/src/telemetry/database/tests/messages.py @@ -12,17 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time import uuid import random from common.proto import telemetry_frontend_pb2 from common.proto import kpi_manager_pb2 from common.proto.kpi_sample_types_pb2 import KpiSampleType +from telemetry.database.TelemetryModel import Collector as CollectorModel def create_collector_request(): _create_collector_request = telemetry_frontend_pb2.Collector() _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) - _create_collector_request.kpi_id.kpi_id.uuid = '2a779f04-77a6-4b32-b020-893e0e1e656f' # must be primary key in kpi table + _create_collector_request.kpi_id.kpi_id.uuid = '71d58648-bf47-49ac-996f-e63a9fbfead4' # must be primary key in kpi table # _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_collector_request.duration_s = float(random.randint(8, 16)) _create_collector_request.interval_s = float(random.randint(2, 4)) @@ -43,19 +45,19 @@ def create_kpi_request(): def create_kpi_id_request(): _create_kpi_id_request = kpi_manager_pb2.KpiId() - _create_kpi_id_request.kpi_id.uuid = '11e2c6c6-b507-40aa-ab3a-ffd41e7125f0' + _create_kpi_id_request.kpi_id.uuid = '71d58648-bf47-49ac-996f-e63a9fbfead4' return _create_kpi_id_request def create_collector_id_request(): _create_collector_id_request = telemetry_frontend_pb2.CollectorId() - _create_collector_id_request.collector_id.uuid = '50ba9199-7e9d-45b5-a2fc-3f97917bad65' + _create_collector_id_request.collector_id.uuid = '71d58648-bf47-49ac-996f-e63a9fbfead4' return _create_collector_id_request def create_kpi_filter_request(): # create a dict as follows: 'Key' = 'KpiModel' column name and 'Value' = filter to apply. _create_kpi_filter_request = dict() _create_kpi_filter_request['kpi_sample_type'] = 102 - _create_kpi_filter_request['kpi_id'] = '11e2c6c6-b507-40aa-ab3a-ffd41e7125f0' + _create_kpi_filter_request['kpi_id'] = '3a17230d-8e95-4afb-8b21-6965481aee5a' return _create_kpi_filter_request def create_collector_filter_request(): @@ -63,4 +65,16 @@ def create_collector_filter_request(): _create_kpi_filter_request = dict() _create_kpi_filter_request['sampling_interval_s'] = 3.0 # _create_kpi_filter_request['kpi_id'] = '11e2c6c6-b507-40aa-ab3a-ffd41e7125f0' - return _create_kpi_filter_request \ No newline at end of file + return _create_kpi_filter_request + +def create_collector_model_object(): + # Create a new Collector instance + collector_to_insert = CollectorModel() + collector_to_insert.collector_id = str(uuid.uuid4()) + collector_to_insert.kpi_id = '3a17230d-8e95-4afb-8b21-6965481aee5a' + collector_to_insert.collector = "Test collector description" + collector_to_insert.sampling_duration_s = 15 + collector_to_insert.sampling_interval_s = 3 + collector_to_insert.start_timestamp = time.time() + collector_to_insert.end_timestamp = time.time() + return collector_to_insert \ No newline at end of file diff --git a/src/telemetry/database/tests/telemetryDBtests.py b/src/telemetry/database/tests/telemetryDBtests.py index 81431beb7..14def9ef2 100644 --- a/src/telemetry/database/tests/telemetryDBtests.py +++ b/src/telemetry/database/tests/telemetryDBtests.py @@ -15,6 +15,7 @@ import logging from typing import Any +from sqlalchemy.ext.declarative import declarative_base from telemetry.database.TelemetryDBmanager import TelemetryDBmanager from telemetry.database.TelemetryEngine import TelemetryEngine from telemetry.database.tests import temp_DB @@ -25,42 +26,52 @@ from .messages import create_kpi_request, create_collector_request, \ logging.basicConfig(level=logging.INFO) LOGGER = logging.getLogger(__name__) + # def test_temp_DB(): # temp_DB.main() def test_telemetry_object_creation(): LOGGER.info('--- test_telemetry_object_creation: START') - LOGGER.info('>>> Creating TelemetryDBmanager Object: ') + LOGGER.info('>>> Creating TelemetryDBmanager Object <<< ') TelemetryDBmanagerObj = TelemetryDBmanager() - # LOGGER.info('>>> Creating Tables: ') + # LOGGER.info('>>> Creating database <<< ') + # TelemetryDBmanagerObj.create_database() + + # LOGGER.info('>>> verifing database <<< ') + # TelemetryDBmanagerObj.list_databases() + + # # LOGGER.info('>>> Droping Tables: ') + # # TelemetryDBmanagerObj.drop_table("table_naem_here") + + # LOGGER.info('>>> Creating Tables <<< ') # TelemetryDBmanagerObj.create_tables() - # LOGGER.info('>>> Verifing Table creation: ') - # TelemetryDBmanagerObj.verify_tables() + LOGGER.info('>>> Verifing Table creation <<< ') + TelemetryDBmanagerObj.verify_tables() - LOGGER.info('>>> TESTING: Row Insertion Operation: kpi Table') + LOGGER.info('>>> TESTING: Row Insertion Operation: kpi Table <<<') kpi_obj = create_kpi_request() TelemetryDBmanagerObj.inser_kpi(kpi_obj) - LOGGER.info('>>> TESTING: Row Insertion Operation: collector Table') + LOGGER.info('>>> TESTING: Row Insertion Operation: collector Table <<<') collector_obj = create_collector_request() TelemetryDBmanagerObj.insert_collector(collector_obj) - LOGGER.info('>>> TESTING: Get KpiDescriptor ') + LOGGER.info('>>> TESTING: Get KpiDescriptor <<<') kpi_id_obj = create_kpi_id_request() TelemetryDBmanagerObj.get_kpi_descriptor(kpi_id_obj) - LOGGER.info('>>> TESTING: Select Collector ') + LOGGER.info('>>> TESTING: Select Collector <<<') collector_id_obj = create_collector_id_request() TelemetryDBmanagerObj.get_collector(collector_id_obj) - LOGGER.info('>>> TESTING: Applying kpi filter ') + LOGGER.info('>>> TESTING: Applying kpi filter <<< ') kpi_filter : dict[str, Any] = create_kpi_filter_request() TelemetryDBmanagerObj.select_kpi_descriptor(**kpi_filter) - LOGGER.info('>>> TESTING: Applying collector filter ') + LOGGER.info('>>> TESTING: Applying collector filter <<<') collector_filter : dict[str, Any] = create_collector_filter_request() TelemetryDBmanagerObj.select_collector(**collector_filter) diff --git a/src/telemetry/database/tests/temp_DB.py b/src/telemetry/database/tests/temp_DB.py index 7c1074fcf..089d35424 100644 --- a/src/telemetry/database/tests/temp_DB.py +++ b/src/telemetry/database/tests/temp_DB.py @@ -243,7 +243,7 @@ class DatabaseManager: # Example Usage def main(): CRDB_SQL_PORT = "26257" - CRDB_DATABASE = "TelemetryFrontend" + CRDB_DATABASE = "telemetryfrontend" CRDB_USERNAME = "tfs" CRDB_PASSWORD = "tfs123" CRDB_SSLMODE = "require" @@ -255,7 +255,7 @@ def main(): db_manager = DatabaseManager(crdb_uri, CRDB_DATABASE) # Create database - # db_manager.create_database() + db_manager.create_database() # Update db_url to include the new database name db_manager.engine = create_engine(f"{crdb_uri}") diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index f940ccd65..62a8969f9 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -29,6 +29,8 @@ from common.proto.telemetry_frontend_pb2 import CollectorId, Collector, Collecto from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceServicer +from telemetry.database.TelemetryModel import Collector as CollectorModel +from telemetry.database.managementDB import managementDB LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Monitoring', 'TelemetryFrontend') @@ -41,6 +43,23 @@ KAFKA_TOPICS = {'request' : 'topic_request', class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): def __init__(self, name_mapping : NameMapping): LOGGER.info('Init TelemetryFrontendService') + self.managementDBobj = managementDB() + + + def add_collector_to_db(self, request: Collector ): + try: + # Create a new Collector instance + collector_to_insert = CollectorModel() + collector_to_insert.collector_id = request.collector_id.collector_id.uuid + collector_to_insert.kpi_id = '3a17230d-8e95-4afb-8b21-6965481aee5a' + collector_to_insert.collector = "Test collector description" + collector_to_insert.sampling_duration_s = request.duration_s + collector_to_insert.sampling_interval_s = request.interval_s + collector_to_insert.start_timestamp = time.time() + collector_to_insert.end_timestamp = time.time() + self.managementDBobj.add_row_to_db(collector_to_insert) + except Exception as e: + LOGGER.info("Unable to create collectorModel class object. {:}".format(e)) # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StartCollector(self, @@ -52,6 +71,8 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): _collector_kpi_id = str(request.kpi_id.kpi_id.uuid) _collector_duration = int(request.duration_s) _collector_interval = int(request.interval_s) + # pushing Collector to DB + self.add_collector_to_db(request) self.generate_kafka_request(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval) # self.run_generate_kafka_request(_collector_id, _collector_kpi_id, _collector_duration, _collector_interval) response.collector_id.uuid = request.collector_id.collector_id.uuid # type: ignore @@ -74,10 +95,11 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): # topic_request = "topic_request" msg_value = Tuple [str, int, int] msg_value = (kpi, duration, interval) - print ("Request generated: ", "Colletcor Id: ", msg_key, \ - ", \nKPI: ", kpi, ", Duration: ", duration, ", Interval: ", interval) + # print ("Request generated: ", "Colletcor Id: ", msg_key, \ + # ", \nKPI: ", kpi, ", Duration: ", duration, ", Interval: ", interval) producerObj = KafkaProducer(producer_configs) producerObj.produce(KAFKA_TOPICS['request'], key=msg_key, value= str(msg_value), callback=self.delivery_callback) + LOGGER.info("Collector Request Generated: {:} -- {:} -- {:} -- {:}".format(msg_key, kpi, duration, interval)) # producerObj.produce(topic_request, key=msg_key, value= str(msg_value), callback=self.delivery_callback) ACTIVE_COLLECTORS.append(msg_key) producerObj.flush() diff --git a/src/telemetry/frontend/tests/test_frontend.py b/src/telemetry/frontend/tests/test_frontend.py index a531ed617..230122a2d 100644 --- a/src/telemetry/frontend/tests/test_frontend.py +++ b/src/telemetry/frontend/tests/test_frontend.py @@ -168,30 +168,33 @@ def telemetryFrontend_client( ########################### def test_start_collector(telemetryFrontend_client): - LOGGER.warning('test_start_collector requesting') + LOGGER.info('test_start_collector requesting') response = telemetryFrontend_client.StartCollector(create_collector_request()) LOGGER.debug(str(response)) assert isinstance(response, CollectorId) -def test_start_collector_a(telemetryFrontend_client): - LOGGER.warning('test_start_collector requesting') - response = telemetryFrontend_client.StartCollector(create_collector_request()) - LOGGER.debug(str(response)) - assert isinstance(response, CollectorId) +# def test_start_collector_a(telemetryFrontend_client): +# LOGGER.warning('test_start_collector requesting') +# response = telemetryFrontend_client.StartCollector(create_collector_request()) +# LOGGER.debug(str(response)) +# assert isinstance(response, CollectorId) + +# def test_start_collector_b(telemetryFrontend_client): +# LOGGER.warning('test_start_collector requesting') +# response = telemetryFrontend_client.StartCollector(create_collector_request()) +# LOGGER.debug(str(response)) +# assert isinstance(response, CollectorId) + +# def test_run_kafka_listener(): +# LOGGER.warning('test_receive_kafka_request requesting') +# name_mapping = NameMapping() +# TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl(name_mapping) +# response = TelemetryFrontendServiceObj.run_kafka_listener() # Method "run_kafka_listener" is not define in frontend.proto +# LOGGER.debug(str(response)) +# assert isinstance(response, bool) + -def test_start_collector_b(telemetryFrontend_client): - LOGGER.warning('test_start_collector requesting') - response = telemetryFrontend_client.StartCollector(create_collector_request()) - LOGGER.debug(str(response)) - assert isinstance(response, CollectorId) -def test_run_kafka_listener(): - LOGGER.warning('test_receive_kafka_request requesting') - name_mapping = NameMapping() - TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl(name_mapping) - response = TelemetryFrontendServiceObj.run_kafka_listener() # Method "run_kafka_listener" is not define in frontend.proto - LOGGER.debug(str(response)) - assert isinstance(response, bool) # def test_stop_collector(telemetryFrontend_client): # LOGGER.warning('test_stop_collector requesting') -- GitLab