From 6c2d2c40e0ab2ca1999036e43182e0b492fb713e Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Thu, 26 Sep 2024 09:49:18 +0000 Subject: [PATCH 01/13] Generic DB class. - A generic DB class is added in common.tools.databse. --- src/common/tools/database/GenericDatabase.py | 139 +++++++++++++++++++ src/common/tools/database/GenericEngine.py | 40 ++++++ src/common/tools/database/__init__.py | 14 ++ 3 files changed, 193 insertions(+) create mode 100644 src/common/tools/database/GenericDatabase.py create mode 100644 src/common/tools/database/GenericEngine.py create mode 100644 src/common/tools/database/__init__.py diff --git a/src/common/tools/database/GenericDatabase.py b/src/common/tools/database/GenericDatabase.py new file mode 100644 index 000000000..7c6453d7c --- /dev/null +++ b/src/common/tools/database/GenericDatabase.py @@ -0,0 +1,139 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +import sqlalchemy_utils +from .GenericEngine import Engine +from sqlalchemy import inspect +from sqlalchemy.orm import sessionmaker + +from common.method_wrappers.ServiceExceptions import (OperationFailedException, AlreadyExistsException) + +LOGGER = logging.getLogger(__name__) + +class Database: + def __init__(self, db_name, model): + self.db_engine = Engine.get_engine(db_name) + if self.db_engine is None: + LOGGER.error('Unable to get SQLAlchemy DB Engine...') + raise Exception('Failed to initialize the database engine.') + self.db_name = db_name + self.db_model = model + self.db_table = model.__name__ + self.Session = sessionmaker(bind=self.db_engine) + + def create_database(self): + if not sqlalchemy_utils.database_exists(self.db_engine.url): + LOGGER.debug("Database created. {:}".format(self.db_engine.url)) + sqlalchemy_utils.create_database(self.db_engine.url) + + def drop_database(self) -> None: + if sqlalchemy_utils.database_exists(self.db_engine.url): + sqlalchemy_utils.drop_database(self.db_engine.url) + + def create_tables(self): + try: + self.db_model.metadata.create_all(self.db_engine) + LOGGER.debug("Tables created in the database: {:}".format(self.db_name)) + except Exception as e: + LOGGER.debug("Tables cannot be created in the database. {:s}".format(str(e))) + raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)]) + + def verify_tables(self): + try: + inspect_object = inspect(self.db_engine) + if(inspect_object.has_table(self.db_table , None)): + LOGGER.info("Table exists in DB: {:}".format(self.db_name)) + except Exception as e: + LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) + +# ----------------- DB OPERATIONS --------------------- + + def add_row_to_db(self, row): + session = self.Session() + try: + session.add(row) + session.commit() + LOGGER.debug(f"Row inserted into {row.__class__.__name__} table.") + return True + except Exception as e: + session.rollback() + if "psycopg2.errors.UniqueViolation" in str(e): + LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}") + raise AlreadyExistsException(row.__class__.__name__, row, + extra_details=["Unique key voilation: {:}".format(e)] ) + else: + LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}") + raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) + finally: + session.close() + + def search_db_row_by_id(self, model, col_name, id_to_search): + session = self.Session() + try: + entity = session.query(model).filter_by(**{col_name: id_to_search}).first() + if entity: + # LOGGER.debug(f"{model.__name__} ID found: {str(entity)}") + return entity + else: + LOGGER.debug(f"{model.__name__} ID not found, No matching row: {str(id_to_search)}") + print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search)) + return None + except Exception as e: + session.rollback() + LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}") + raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)]) + finally: + session.close() + + def delete_db_row_by_id(self, model, col_name, id_to_search): + session = self.Session() + try: + record = session.query(model).filter_by(**{col_name: id_to_search}).first() + if record: + session.delete(record) + session.commit() + LOGGER.debug("Deleted %s with %s: %s", model.__name__, col_name, id_to_search) + else: + LOGGER.debug("%s with %s %s not found", model.__name__, col_name, id_to_search) + return None + except Exception as e: + session.rollback() + LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e) + raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) + finally: + session.close() + + def select_with_filter(self, query_object, session, model): + """ + Generic method to apply filters dynamically based on filter. + params: model_name: SQLAlchemy model class name. + query_object : Object that contains query with applied filters. + session: session of the query. + return: List of filtered records. + """ + try: + result = query_object.all() + # Log result and handle empty case + if result: + LOGGER.debug(f"Fetched filtered rows from {model.__name__} with filters: {query_object}") + else: + LOGGER.warning(f"No matching rows found in {model.__name__} with filters: {query_object}") + return result + except Exception as e: + LOGGER.error(f"Error fetching filtered rows from {model.__name__} with filters {query_object} ::: {e}") + raise OperationFailedException("Select by filter", extra_details=[f"Unable to apply the filter: {e}"]) + finally: + session.close() diff --git a/src/common/tools/database/GenericEngine.py b/src/common/tools/database/GenericEngine.py new file mode 100644 index 000000000..ff3def466 --- /dev/null +++ b/src/common/tools/database/GenericEngine.py @@ -0,0 +1,40 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, sqlalchemy +from common.Settings import get_setting + +LOGGER = logging.getLogger(__name__) +CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}' + +class Engine: + @staticmethod + def get_engine(db_name) -> sqlalchemy.engine.Engine: + crdb_uri = get_setting('CRDB_URI', default=None) + if crdb_uri is None: + CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE') + CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT') + CRDB_DATABASE = db_name + CRDB_USERNAME = get_setting('CRDB_USERNAME') + CRDB_PASSWORD = get_setting('CRDB_PASSWORD') + CRDB_SSLMODE = get_setting('CRDB_SSLMODE') + crdb_uri = CRDB_URI_TEMPLATE.format( + CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) + try: + engine = sqlalchemy.create_engine(crdb_uri, echo=False) + LOGGER.info(' AnalyzerDB initalized with DB URL: {:}'.format(crdb_uri)) + except: # pylint: disable=bare-except # pragma: no cover + LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) + return None # type: ignore + return engine diff --git a/src/common/tools/database/__init__.py b/src/common/tools/database/__init__.py new file mode 100644 index 000000000..3ee6f7071 --- /dev/null +++ b/src/common/tools/database/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + -- GitLab From 56e5fd59573b444a33416ce2ea44bfb531e24780 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Thu, 26 Sep 2024 10:04:11 +0000 Subject: [PATCH 02/13] Changes in KpiManager to incorporate Generic DB change. --- src/kpi_manager/database/new_KpiDB.py | 77 +++++++++++++++++++ .../service/KpiManagerServiceServicerImpl.py | 3 +- src/kpi_manager/tests/__init__.py | 14 ++++ src/kpi_manager/tests/test_kpi_db.py | 26 ++++++- src/kpi_manager/tests/test_kpi_manager.py | 12 +-- 5 files changed, 124 insertions(+), 8 deletions(-) create mode 100644 src/kpi_manager/database/new_KpiDB.py create mode 100644 src/kpi_manager/tests/__init__.py diff --git a/src/kpi_manager/database/new_KpiDB.py b/src/kpi_manager/database/new_KpiDB.py new file mode 100644 index 000000000..ecea0a2ad --- /dev/null +++ b/src/kpi_manager/database/new_KpiDB.py @@ -0,0 +1,77 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method +from kpi_manager.database.KpiModel import Kpi as KpiModel +from common.tools.database.GenericDatabase import Database +from common.method_wrappers.ServiceExceptions import OperationFailedException + +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('KpiManager', 'Database') + +class KpiDB(Database): + def __init__(self) -> None: + LOGGER.info('Init KpiManagerService') + super().__init__("tfs_kpi_mgmt", KpiModel) + + def add_row_to_db(self, row): + return super().add_row_to_db(row) + + def search_db_row_by_id(self, model, col_name, id_to_search): + return super().search_db_row_by_id(model, col_name, id_to_search) + + def delete_db_row_by_id(self, model, col_name, id_to_search): + return super().delete_db_row_by_id(model, col_name, id_to_search) + + def select_with_filter(self, model, filter_object): + """ + Generic method to create filters dynamically based on filter_object attributes. + params: model: SQLAlchemy model class to query. + filter_object: Object that contains filtering criteria as attributes. + return: SQLAlchemy session, query and Model + """ + session = self.Session() + try: + query = session.query(KpiModel) + # Apply filters based on the filter_object + if filter_object.kpi_id: + query = query.filter(KpiModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) + + if filter_object.kpi_sample_type: + query = query.filter(KpiModel.kpi_sample_type.in_(filter_object.kpi_sample_type)) + + if filter_object.device_id: + query = query.filter(KpiModel.device_id.in_([d.device_uuid.uuid for d in filter_object.device_id])) + + if filter_object.endpoint_id: + query = query.filter(KpiModel.endpoint_id.in_([e.endpoint_uuid.uuid for e in filter_object.endpoint_id])) + + if filter_object.service_id: + query = query.filter(KpiModel.service_id.in_([s.service_uuid.uuid for s in filter_object.service_id])) + + if filter_object.slice_id: + query = query.filter(KpiModel.slice_id.in_([s.slice_uuid.uuid for s in filter_object.slice_id])) + + if filter_object.connection_id: + query = query.filter(KpiModel.connection_id.in_([c.connection_uuid.uuid for c in filter_object.connection_id])) + + if filter_object.link_id: + query = query.filter(KpiModel.link_id.in_([l.link_uuid.uuid for l in filter_object.link_id])) + except Exception as e: + LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") + raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) + + return super().select_with_filter(query, session, KpiModel) diff --git a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py index fd2247482..39bc9e0b0 100644 --- a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py +++ b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py @@ -18,7 +18,8 @@ from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_m from common.proto.context_pb2 import Empty from common.proto.kpi_manager_pb2_grpc import KpiManagerServiceServicer from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList -from kpi_manager.database.Kpi_DB import KpiDB +# from kpi_manager.database.Kpi_DB import KpiDB +from kpi_manager.database.new_KpiDB import KpiDB from kpi_manager.database.KpiModel import Kpi as KpiModel LOGGER = logging.getLogger(__name__) diff --git a/src/kpi_manager/tests/__init__.py b/src/kpi_manager/tests/__init__.py new file mode 100644 index 000000000..3ee6f7071 --- /dev/null +++ b/src/kpi_manager/tests/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/kpi_manager/tests/test_kpi_db.py b/src/kpi_manager/tests/test_kpi_db.py index d4a57f836..b9c3b2d7c 100644 --- a/src/kpi_manager/tests/test_kpi_db.py +++ b/src/kpi_manager/tests/test_kpi_db.py @@ -14,7 +14,12 @@ import logging -from kpi_manager.database.Kpi_DB import KpiDB +# from kpi_manager.database.Kpi_DB import KpiDB +from common.proto.kpi_manager_pb2 import KpiDescriptorList +from .test_messages import create_kpi_filter_request +from kpi_manager.database.KpiModel import Kpi as KpiModel +from kpi_manager.database.new_KpiDB import KpiDB +# from common.tools.database.GenericDatabase import Database LOGGER = logging.getLogger(__name__) @@ -26,3 +31,22 @@ def test_verify_databases_and_Tables(): kpiDBobj.create_database() kpiDBobj.create_tables() kpiDBobj.verify_tables() + +# def test_generic_DB_select_method(): +# LOGGER.info("--> STARTED-test_generic_DB_select_method") +# kpi_obj = KpiDB() +# _filter = create_kpi_filter_request() +# # response = KpiDescriptorList() +# try: +# kpi_obj.select_with_filter(KpiModel, _filter) +# except Exception as e: +# LOGGER.error('Unable to apply filter on kpi descriptor. {:}'.format(e)) +# LOGGER.info("--> FINISHED-test_generic_DB_select_method") +# # try: +# # for row in rows: +# # kpiDescriptor_obj = KpiModel.convert_row_to_KpiDescriptor(row) +# # response.kpi_descriptor_list.append(kpiDescriptor_obj) +# # return response +# # except Exception as e: +# # LOGGER.info('Unable to process filter response {:}'.format(e)) +# # assert isinstance(r) diff --git a/src/kpi_manager/tests/test_kpi_manager.py b/src/kpi_manager/tests/test_kpi_manager.py index 219fdadee..06e836b70 100755 --- a/src/kpi_manager/tests/test_kpi_manager.py +++ b/src/kpi_manager/tests/test_kpi_manager.py @@ -139,9 +139,9 @@ def test_SelectKpiDescriptor(kpi_manager_client): LOGGER.info("Response gRPC message object: {:}".format(response)) assert isinstance(response, KpiDescriptorList) -def test_set_list_of_KPIs(kpi_manager_client): - LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") - KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"] - # adding KPI - for kpi in KPIs_TO_SEARCH: - kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi)) +# def test_set_list_of_KPIs(kpi_manager_client): +# LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") +# KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"] +# # adding KPI +# for kpi in KPIs_TO_SEARCH: +# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi)) -- GitLab From 48748aa988e11c6fee6d8f29f3034551179d97eb Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Thu, 26 Sep 2024 10:57:06 +0000 Subject: [PATCH 03/13] Refactor TelemetryFrontend to reflect Generic DB changes. --- src/kpi_manager/database/new_KpiDB.py | 26 ++++----- src/telemetry/database/new_Telemetry_DB.py | 57 +++++++++++++++++++ .../TelemetryFrontendServiceServicerImpl.py | 3 +- src/telemetry/tests/test_telemetryDB.py | 6 +- 4 files changed, 75 insertions(+), 17 deletions(-) create mode 100644 src/telemetry/database/new_Telemetry_DB.py diff --git a/src/kpi_manager/database/new_KpiDB.py b/src/kpi_manager/database/new_KpiDB.py index ecea0a2ad..64d2dc10b 100644 --- a/src/kpi_manager/database/new_KpiDB.py +++ b/src/kpi_manager/database/new_KpiDB.py @@ -14,8 +14,8 @@ import logging -from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method -from kpi_manager.database.KpiModel import Kpi as KpiModel +from common.method_wrappers.Decorator import MetricsPool +from kpi_manager.database.KpiModel import Kpi as Model from common.tools.database.GenericDatabase import Database from common.method_wrappers.ServiceExceptions import OperationFailedException @@ -25,7 +25,7 @@ METRICS_POOL = MetricsPool('KpiManager', 'Database') class KpiDB(Database): def __init__(self) -> None: LOGGER.info('Init KpiManagerService') - super().__init__("tfs_kpi_mgmt", KpiModel) + super().__init__("tfs_kpi_mgmt", Model) def add_row_to_db(self, row): return super().add_row_to_db(row) @@ -45,33 +45,33 @@ class KpiDB(Database): """ session = self.Session() try: - query = session.query(KpiModel) + query = session.query(Model) # Apply filters based on the filter_object if filter_object.kpi_id: - query = query.filter(KpiModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) + query = query.filter(Model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) if filter_object.kpi_sample_type: - query = query.filter(KpiModel.kpi_sample_type.in_(filter_object.kpi_sample_type)) + query = query.filter(Model.kpi_sample_type.in_(filter_object.kpi_sample_type)) if filter_object.device_id: - query = query.filter(KpiModel.device_id.in_([d.device_uuid.uuid for d in filter_object.device_id])) + query = query.filter(Model.device_id.in_([d.device_uuid.uuid for d in filter_object.device_id])) if filter_object.endpoint_id: - query = query.filter(KpiModel.endpoint_id.in_([e.endpoint_uuid.uuid for e in filter_object.endpoint_id])) + query = query.filter(Model.endpoint_id.in_([e.endpoint_uuid.uuid for e in filter_object.endpoint_id])) if filter_object.service_id: - query = query.filter(KpiModel.service_id.in_([s.service_uuid.uuid for s in filter_object.service_id])) + query = query.filter(Model.service_id.in_([s.service_uuid.uuid for s in filter_object.service_id])) if filter_object.slice_id: - query = query.filter(KpiModel.slice_id.in_([s.slice_uuid.uuid for s in filter_object.slice_id])) + query = query.filter(Model.slice_id.in_([s.slice_uuid.uuid for s in filter_object.slice_id])) if filter_object.connection_id: - query = query.filter(KpiModel.connection_id.in_([c.connection_uuid.uuid for c in filter_object.connection_id])) + query = query.filter(Model.connection_id.in_([c.connection_uuid.uuid for c in filter_object.connection_id])) if filter_object.link_id: - query = query.filter(KpiModel.link_id.in_([l.link_uuid.uuid for l in filter_object.link_id])) + query = query.filter(Model.link_id.in_([l.link_uuid.uuid for l in filter_object.link_id])) except Exception as e: LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) - return super().select_with_filter(query, session, KpiModel) + return super().select_with_filter(query, session, Model) diff --git a/src/telemetry/database/new_Telemetry_DB.py b/src/telemetry/database/new_Telemetry_DB.py new file mode 100644 index 000000000..bd9d863d2 --- /dev/null +++ b/src/telemetry/database/new_Telemetry_DB.py @@ -0,0 +1,57 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method +from .TelemetryModel import Collector as Model +from common.tools.database.GenericDatabase import Database +from common.method_wrappers.ServiceExceptions import OperationFailedException + +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('TelemteryFrontend', 'Database') + +class TelemetryDB(Database): + def __init__(self) -> None: + LOGGER.info('Init KpiManagerService') + super().__init__("tfs_telemetry", Model) + + def add_row_to_db(self, row): + return super().add_row_to_db(row) + + def search_db_row_by_id(self, model, col_name, id_to_search): + return super().search_db_row_by_id(model, col_name, id_to_search) + + def delete_db_row_by_id(self, model, col_name, id_to_search): + return super().delete_db_row_by_id(model, col_name, id_to_search) + + def select_with_filter(self, model, filter_object): + """ + Generic method to create filters dynamically based on filter_object attributes. + params: model: SQLAlchemy model class to query. + filter_object: Object that contains filtering criteria as attributes. + return: SQLAlchemy session, query and Model + """ + session = self.Session() + try: + query = session.query(Model) + # Apply filters based on the filter_object + if filter_object.kpi_id: + query = query.filter(Model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) + # query should be added to return all rows + except Exception as e: + LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") + raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) + + return super().select_with_filter(query, session, Model) diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index b73d9fa95..9068de0db 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -25,7 +25,8 @@ from common.proto.telemetry_frontend_pb2 import CollectorId, Collector, Collecto from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceServicer from telemetry.database.TelemetryModel import Collector as CollectorModel -from telemetry.database.Telemetry_DB import TelemetryDB +# from telemetry.database.Telemetry_DB import TelemetryDB +from ...database.new_Telemetry_DB import TelemetryDB from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Producer as KafkaProducer diff --git a/src/telemetry/tests/test_telemetryDB.py b/src/telemetry/tests/test_telemetryDB.py index c4976f8c2..707e6b5b2 100644 --- a/src/telemetry/tests/test_telemetryDB.py +++ b/src/telemetry/tests/test_telemetryDB.py @@ -14,15 +14,15 @@ import logging -from telemetry.database.Telemetry_DB import TelemetryDB +from telemetry.database.new_Telemetry_DB import TelemetryDB LOGGER = logging.getLogger(__name__) def test_verify_databases_and_tables(): LOGGER.info('>>> test_verify_databases_and_tables : START <<< ') TelemetryDBobj = TelemetryDB() - TelemetryDBobj.drop_database() - TelemetryDBobj.verify_tables() + # TelemetryDBobj.drop_database() + # TelemetryDBobj.verify_tables() TelemetryDBobj.create_database() TelemetryDBobj.create_tables() TelemetryDBobj.verify_tables() \ No newline at end of file -- GitLab From 4d6ef02b399b5d7fef4d822d38fd4bb291c5cb67 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Thu, 26 Sep 2024 11:23:08 +0000 Subject: [PATCH 04/13] Refactor AnalyticsFrontend to reflect Generic DB changes --- src/analytics/database/new_Analyzer_DB.py | 67 +++++++++++++++++++ .../AnalyticsFrontendServiceServicerImpl.py | 9 ++- src/analytics/frontend/tests/test_frontend.py | 38 +++++------ src/analytics/tests/test_analytics_db.py | 2 +- src/kpi_manager/database/new_KpiDB.py | 2 +- 5 files changed, 88 insertions(+), 30 deletions(-) create mode 100644 src/analytics/database/new_Analyzer_DB.py diff --git a/src/analytics/database/new_Analyzer_DB.py b/src/analytics/database/new_Analyzer_DB.py new file mode 100644 index 000000000..3b3a74641 --- /dev/null +++ b/src/analytics/database/new_Analyzer_DB.py @@ -0,0 +1,67 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from common.method_wrappers.Decorator import MetricsPool +from .AnalyzerModel import Analyzer as Model +from common.tools.database.GenericDatabase import Database +from common.method_wrappers.ServiceExceptions import OperationFailedException + +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('KpiManager', 'Database') + +class AnalyzerDB(Database): + def __init__(self) -> None: + LOGGER.info('Init KpiManagerService') + super().__init__("tfs_kpi_mgmt", Model) + + def add_row_to_db(self, row): + return super().add_row_to_db(row) + + def search_db_row_by_id(self, model, col_name, id_to_search): + return super().search_db_row_by_id(model, col_name, id_to_search) + + def delete_db_row_by_id(self, model, col_name, id_to_search): + return super().delete_db_row_by_id(model, col_name, id_to_search) + + def select_with_filter(self, model, filter_object): + """ + Generic method to create filters dynamically based on filter_object attributes. + params: model: SQLAlchemy model class to query. + filter_object: Object that contains filtering criteria as attributes. + return: SQLAlchemy session, query and Model + """ + session = self.Session() + try: + query = session.query(Model) + # Apply filters based on the filter_object + if filter_object.analyzer_id: + query = query.filter(Model.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) + + if filter_object.algorithm_names: + query = query.filter(Model.algorithm_name.in_(filter_object.algorithm_names)) + + if filter_object.input_kpi_ids: + input_kpi_uuids = [k.kpi_id.uuid for k in filter_object.input_kpi_ids] + query = query.filter(Model.input_kpi_ids.op('&&')(input_kpi_uuids)) + + if filter_object.output_kpi_ids: + output_kpi_uuids = [k.kpi_id.uuid for k in filter_object.output_kpi_ids] + query = query.filter(Model.output_kpi_ids.op('&&')(output_kpi_uuids)) + except Exception as e: + LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") + raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) + + return super().select_with_filter(query, session, Model) \ No newline at end of file diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index 8bb6a17af..5ea0e2ef6 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -25,7 +25,7 @@ from common.proto.context_pb2 import Empty from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer -from analytics.database.Analyzer_DB import AnalyzerDB +from analytics.database. new_Analyzer_DB import AnalyzerDB from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger @@ -84,7 +84,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate)) self.kafka_producer.flush() - # self.StartResponseListener(analyzer_uuid) def StartResponseListener(self, filter_key=None): """ @@ -209,6 +208,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) print ('Message delivery failed: {:}'.format(err)) - # else: - # LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) - # print('Message delivered to topic {:}'.format(msg.topic())) + else: + LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) + print('Message delivered to topic {:}'.format(msg.topic())) diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py index d2428c01f..f7a25f4c7 100644 --- a/src/analytics/frontend/tests/test_frontend.py +++ b/src/analytics/frontend/tests/test_frontend.py @@ -84,10 +84,10 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic ########################### # --- "test_validate_kafka_topics" should be executed before the functionality tests --- -def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) +# def test_validate_kafka_topics(): +# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") +# response = KafkaTopic.create_all_topics() +# assert isinstance(response, bool) # ----- core funtionality test ----- # def test_StartAnalytics(analyticsFrontend_client): @@ -102,27 +102,19 @@ def test_StartStopAnalyzers(analyticsFrontend_client): LOGGER.info('--> StartAnalyzer') added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer()) LOGGER.debug(str(added_analyzer_id)) - LOGGER.info(' --> Calling StartResponseListener... ') - class_obj = AnalyticsFrontendServiceServicerImpl() - response = class_obj.StartResponseListener(added_analyzer_id.analyzer_id._uuid) - LOGGER.debug(response) - LOGGER.info("waiting for timer to comlete ...") - time.sleep(3) - LOGGER.info('--> StopAnalyzer') - response = analyticsFrontend_client.StopAnalyzer(added_analyzer_id) - LOGGER.debug(str(response)) + assert isinstance(added_analyzer_id, AnalyzerId) -# def test_SelectAnalytics(analyticsFrontend_client): -# LOGGER.info(' >>> test_SelectAnalytics START: <<< ') -# response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter()) -# LOGGER.debug(str(response)) -# assert isinstance(response, AnalyzerList) +def test_StopAnalytic(analyticsFrontend_client): + LOGGER.info(' >>> test_StopAnalytic START: <<< ') + response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id()) + LOGGER.debug(str(response)) + assert isinstance(response, Empty) -# def test_StopAnalytic(analyticsFrontend_client): -# LOGGER.info(' >>> test_StopAnalytic START: <<< ') -# response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id()) -# LOGGER.debug(str(response)) -# assert isinstance(response, Empty) +def test_SelectAnalytics(analyticsFrontend_client): + LOGGER.info(' >>> test_SelectAnalytics START: <<< ') + response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter()) + LOGGER.debug(str(response)) + assert isinstance(response, AnalyzerList) # def test_ResponseListener(): # LOGGER.info(' >>> test_ResponseListener START <<< ') diff --git a/src/analytics/tests/test_analytics_db.py b/src/analytics/tests/test_analytics_db.py index 58e7d0167..f944fc0b5 100644 --- a/src/analytics/tests/test_analytics_db.py +++ b/src/analytics/tests/test_analytics_db.py @@ -14,7 +14,7 @@ import logging -from analytics.database.Analyzer_DB import AnalyzerDB +from analytics.database.new_Analyzer_DB import AnalyzerDB LOGGER = logging.getLogger(__name__) diff --git a/src/kpi_manager/database/new_KpiDB.py b/src/kpi_manager/database/new_KpiDB.py index 64d2dc10b..8aa1cd20b 100644 --- a/src/kpi_manager/database/new_KpiDB.py +++ b/src/kpi_manager/database/new_KpiDB.py @@ -15,7 +15,7 @@ import logging from common.method_wrappers.Decorator import MetricsPool -from kpi_manager.database.KpiModel import Kpi as Model +from .KpiModel import Kpi as Model from common.tools.database.GenericDatabase import Database from common.method_wrappers.ServiceExceptions import OperationFailedException -- GitLab From 424f46b856eda292897d847a89d351cff183f8c9 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Thu, 26 Sep 2024 12:24:54 +0000 Subject: [PATCH 05/13] Minor changes in Kpi, Telemetry and Analytics DB scripts. --- scripts/run_tests_locally-kpi-DB.sh | 2 +- scripts/run_tests_locally-telemetry-DB.sh | 3 ++- scripts/run_tests_locally-telemetry-backend.sh | 5 ++--- scripts/run_tests_locally-telemetry-frontend.sh | 6 ++---- 4 files changed, 7 insertions(+), 9 deletions(-) diff --git a/scripts/run_tests_locally-kpi-DB.sh b/scripts/run_tests_locally-kpi-DB.sh index 4953b49e0..29c659510 100755 --- a/scripts/run_tests_locally-kpi-DB.sh +++ b/scripts/run_tests_locally-kpi-DB.sh @@ -24,7 +24,7 @@ cd $PROJECTDIR/src # python3 kpi_manager/tests/test_unitary.py RCFILE=$PROJECTDIR/coverage/.coveragerc -CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}') +CRDB_SQL_ADDRESS=$(kubectl get service --namespace ${CRDB_NAMESPACE} cockroachdb-public -o 'jsonpath={.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ kpi_manager/tests/test_kpi_db.py diff --git a/scripts/run_tests_locally-telemetry-DB.sh b/scripts/run_tests_locally-telemetry-DB.sh index 4b9a41760..85cb8664a 100755 --- a/scripts/run_tests_locally-telemetry-DB.sh +++ b/scripts/run_tests_locally-telemetry-DB.sh @@ -20,7 +20,8 @@ cd $PROJECTDIR/src # RCFILE=$PROJECTDIR/coverage/.coveragerc # coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ # kpi_manager/tests/test_unitary.py - +CRDB_SQL_ADDRESS=$(kubectl get service --namespace ${CRDB_NAMESPACE} cockroachdb-public -o 'jsonpath={.spec.clusterIP}') +export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" RCFILE=$PROJECTDIR/coverage/.coveragerc python3 -m pytest --log-level=DEBUG --log-cli-level=debug --verbose \ telemetry/tests/test_telemetryDB.py diff --git a/scripts/run_tests_locally-telemetry-backend.sh b/scripts/run_tests_locally-telemetry-backend.sh index 79db05fcf..97a06a0d6 100755 --- a/scripts/run_tests_locally-telemetry-backend.sh +++ b/scripts/run_tests_locally-telemetry-backend.sh @@ -19,10 +19,9 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src # RCFILE=$PROJECTDIR/coverage/.coveragerc # coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ -# kpi_manager/tests/test_unitary.py - -# python3 kpi_manager/tests/test_unitary.py RCFILE=$PROJECTDIR/coverage/.coveragerc + + python3 -m pytest --log-level=INFO --log-cli-level=debug --verbose \ telemetry/backend/tests/test_TelemetryBackend.py diff --git a/scripts/run_tests_locally-telemetry-frontend.sh b/scripts/run_tests_locally-telemetry-frontend.sh index a2a1de523..7506be5e0 100755 --- a/scripts/run_tests_locally-telemetry-frontend.sh +++ b/scripts/run_tests_locally-telemetry-frontend.sh @@ -17,11 +17,9 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src -# RCFILE=$PROJECTDIR/coverage/.coveragerc -# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ -# kpi_manager/tests/test_unitary.py -# python3 kpi_manager/tests/test_unitary.py +CRDB_SQL_ADDRESS=$(kubectl get service --namespace ${CRDB_NAMESPACE} cockroachdb-public -o 'jsonpath={.spec.clusterIP}') +export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" RCFILE=$PROJECTDIR/coverage/.coveragerc python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ -- GitLab From aec985e75664ab9ef4142cd5a91df46758557403 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Mon, 30 Sep 2024 07:47:38 +0000 Subject: [PATCH 06/13] Refactor in Analytics, Telemetry, and KpiManager Services and Manifest Files to Reflect Generic DB Changes. - Improvements made to `tfs.sh` script. - Added `CRDB_DATABASE` environment variable to all monitoring service manifest files and the context manifest file. - Removed the DB engine file from all monitoring services and moved it to `common.tools.database`. - Each `_DB.py` will now retrieve the database name from the environment variable. --- deploy/tfs.sh | 36 ++-- manifests/analyticsservice.yaml | 2 + manifests/contextservice.yaml | 2 + manifests/kpi_managerservice.yaml | 2 + manifests/telemetryservice.yaml | 2 + src/analytics/database/AnalyzerEngine.py | 40 ----- src/analytics/database/Analyzer_DB.py | 145 +++-------------- src/analytics/database/new_Analyzer_DB.py | 67 -------- .../AnalyticsFrontendServiceServicerImpl.py | 2 +- .../database/{new_KpiDB.py => KpiDB.py} | 15 +- src/kpi_manager/database/KpiEngine.py | 40 ----- src/kpi_manager/database/Kpi_DB.py | 154 ------------------ .../service/KpiManagerServiceServicerImpl.py | 2 +- src/telemetry/database/TelemetryEngine.py | 40 ----- src/telemetry/database/Telemetry_DB.py | 142 +++------------- src/telemetry/database/new_Telemetry_DB.py | 57 ------- .../TelemetryFrontendServiceServicerImpl.py | 2 +- 17 files changed, 87 insertions(+), 663 deletions(-) delete mode 100644 src/analytics/database/AnalyzerEngine.py delete mode 100644 src/analytics/database/new_Analyzer_DB.py rename src/kpi_manager/database/{new_KpiDB.py => KpiDB.py} (87%) delete mode 100644 src/kpi_manager/database/KpiEngine.py delete mode 100644 src/kpi_manager/database/Kpi_DB.py delete mode 100644 src/telemetry/database/TelemetryEngine.py delete mode 100644 src/telemetry/database/new_Telemetry_DB.py diff --git a/deploy/tfs.sh b/deploy/tfs.sh index da078a4f3..4268d50b2 100755 --- a/deploy/tfs.sh +++ b/deploy/tfs.sh @@ -146,55 +146,57 @@ kubectl create namespace $TFS_K8S_NAMESPACE sleep 2 printf "\n" -echo "Create secret with CockroachDB data" +echo ">>> Create Secret with CockroachDB data..." +echo "For Context" CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}') -CRDB_DATABASE_CONTEXT=${CRDB_DATABASE} # TODO: change by specific configurable environment variable +# CRDB_DATABASE_CONTEXT="${CRDB_DATABASE}_context" # TODO: change by specific configurable environment variable kubectl create secret generic crdb-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ --from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \ --from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \ - --from-literal=CRDB_DATABASE=${CRDB_DATABASE_CONTEXT} \ --from-literal=CRDB_USERNAME=${CRDB_USERNAME} \ --from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \ --from-literal=CRDB_SSLMODE=require -printf "\n" +# printf "\n" + # --from-literal=CRDB_DATABASE=${CRDB_DATABASE_CONTEXT} \ -echo "Create secret with CockroachDB data for KPI Management microservices" +echo "For KPI Management" CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}') -CRDB_DATABASE_KPI_MGMT="tfs_kpi_mgmt" # TODO: change by specific configurable environment variable +# CRDB_DATABASE_KPI_MGMT="${CRDB_DATABASE}_kpi" kubectl create secret generic crdb-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ --from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \ --from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \ - --from-literal=CRDB_DATABASE=${CRDB_DATABASE_KPI_MGMT} \ --from-literal=CRDB_USERNAME=${CRDB_USERNAME} \ --from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \ --from-literal=CRDB_SSLMODE=require -printf "\n" +# printf "\n" + # --from-literal=CRDB_DATABASE=${CRDB_DATABASE_KPI_MGMT} \ -echo "Create secret with CockroachDB data for Telemetry microservices" +echo "For Telemetry" CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}') -CRDB_DATABASE_TELEMETRY="tfs_telemetry" # TODO: change by specific configurable environment variable +# CRDB_DATABASE_TELEMETRY="${CRDB_DATABASE}_telemetry" kubectl create secret generic crdb-telemetry --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ --from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \ --from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \ - --from-literal=CRDB_DATABASE=${CRDB_DATABASE_TELEMETRY} \ --from-literal=CRDB_USERNAME=${CRDB_USERNAME} \ --from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \ --from-literal=CRDB_SSLMODE=require -printf "\n" +# printf "\n" + # --from-literal=CRDB_DATABASE=${CRDB_DATABASE_TELEMETRY} \ -echo "Create secret with CockroachDB data for Analytics microservices" +echo "For Analytics" CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}') -CRDB_DATABASE_ANALYTICS="tfs_analytics" # TODO: change by specific configurable environment variable +# CRDB_DATABASE_ANALYTICS="${CRDB_DATABASE}_analytics" kubectl create secret generic crdb-analytics --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ --from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \ --from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \ - --from-literal=CRDB_DATABASE=${CRDB_DATABASE_ANALYTICS} \ --from-literal=CRDB_USERNAME=${CRDB_USERNAME} \ --from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \ --from-literal=CRDB_SSLMODE=require -printf "\n" +# printf "\n" + # --from-literal=CRDB_DATABASE=${CRDB_DATABASE_ANALYTICS} \ -echo "Create secret with Apache Kafka data for KPI, Telemetry and Analytics microservices" +echo ">>> Create Secret with Apache Kakfa" +echo "For KPI, Telemetry and Analytics" KFK_SERVER_PORT=$(kubectl --namespace ${KFK_NAMESPACE} get service kafka-service -o 'jsonpath={.spec.ports[0].port}') kubectl create secret generic kfk-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ --from-literal=KFK_NAMESPACE=${KFK_NAMESPACE} \ diff --git a/manifests/analyticsservice.yaml b/manifests/analyticsservice.yaml index 0fa3ed0be..7340dff5f 100644 --- a/manifests/analyticsservice.yaml +++ b/manifests/analyticsservice.yaml @@ -37,6 +37,8 @@ spec: env: - name: LOG_LEVEL value: "INFO" + - name: CRDB_DATABASE + value: "tfs_analytics" envFrom: - secretRef: name: crdb-analytics diff --git a/manifests/contextservice.yaml b/manifests/contextservice.yaml index 3abc4f208..0fc8a1c44 100644 --- a/manifests/contextservice.yaml +++ b/manifests/contextservice.yaml @@ -45,6 +45,8 @@ spec: value: "FALSE" - name: ALLOW_EXPLICIT_ADD_LINK_TO_TOPOLOGY value: "FALSE" + - name: CRDB_DATABASE + value: "tfs_context" envFrom: - secretRef: name: crdb-data diff --git a/manifests/kpi_managerservice.yaml b/manifests/kpi_managerservice.yaml index 984d783a9..efc3a720d 100644 --- a/manifests/kpi_managerservice.yaml +++ b/manifests/kpi_managerservice.yaml @@ -39,6 +39,8 @@ spec: env: - name: LOG_LEVEL value: "INFO" + - name: CRDB_DATABASE + value: "tfs_kpi" envFrom: - secretRef: name: crdb-kpi-data diff --git a/manifests/telemetryservice.yaml b/manifests/telemetryservice.yaml index 2f9917499..7c781bb3d 100644 --- a/manifests/telemetryservice.yaml +++ b/manifests/telemetryservice.yaml @@ -37,6 +37,8 @@ spec: env: - name: LOG_LEVEL value: "INFO" + - name: CRDB_DATABASE + value: "tfs_kpi" envFrom: - secretRef: name: crdb-telemetry diff --git a/src/analytics/database/AnalyzerEngine.py b/src/analytics/database/AnalyzerEngine.py deleted file mode 100644 index 9294e0996..000000000 --- a/src/analytics/database/AnalyzerEngine.py +++ /dev/null @@ -1,40 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging, sqlalchemy -from common.Settings import get_setting - -LOGGER = logging.getLogger(__name__) -CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}' - -class AnalyzerEngine: - @staticmethod - def get_engine() -> sqlalchemy.engine.Engine: - crdb_uri = get_setting('CRDB_URI', default=None) - if crdb_uri is None: - CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE') - CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT') - CRDB_DATABASE = "tfs-analyzer" # TODO: define variable get_setting('CRDB_DATABASE_KPI_MGMT') - CRDB_USERNAME = get_setting('CRDB_USERNAME') - CRDB_PASSWORD = get_setting('CRDB_PASSWORD') - CRDB_SSLMODE = get_setting('CRDB_SSLMODE') - crdb_uri = CRDB_URI_TEMPLATE.format( - CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) - try: - engine = sqlalchemy.create_engine(crdb_uri, echo=False) - LOGGER.info(' AnalyzerDB initalized with DB URL: {:}'.format(crdb_uri)) - except: # pylint: disable=bare-except # pragma: no cover - LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) - return None # type: ignore - return engine diff --git a/src/analytics/database/Analyzer_DB.py b/src/analytics/database/Analyzer_DB.py index 1ba68989a..8420b66b5 100644 --- a/src/analytics/database/Analyzer_DB.py +++ b/src/analytics/database/Analyzer_DB.py @@ -13,138 +13,47 @@ # limitations under the License. import logging -import sqlalchemy_utils +from common.Settings import get_setting +from common.method_wrappers.Decorator import MetricsPool +from .AnalyzerModel import Analyzer as Model +from common.tools.database.GenericDatabase import Database +from common.method_wrappers.ServiceExceptions import OperationFailedException -from sqlalchemy import inspect, or_ -from sqlalchemy.orm import sessionmaker +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('KpiManager', 'Database') +DB_NAME = get_setting('CRDB_DATABASE', default=None) -from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel -from analytics.database.AnalyzerEngine import AnalyzerEngine -from common.method_wrappers.ServiceExceptions import (OperationFailedException, AlreadyExistsException) +class AnalyzerDB(Database): + def __init__(self) -> None: + LOGGER.info('Init KpiManagerService') + super().__init__(DB_NAME, Model) -LOGGER = logging.getLogger(__name__) -DB_NAME = "tfs_analyzer" # TODO: export name from enviornment variable - -class AnalyzerDB: - def __init__(self): - self.db_engine = AnalyzerEngine.get_engine() - if self.db_engine is None: - LOGGER.error('Unable to get SQLAlchemy DB Engine...') - return False - self.db_name = DB_NAME - self.Session = sessionmaker(bind=self.db_engine) - - def create_database(self): - if not sqlalchemy_utils.database_exists(self.db_engine.url): - LOGGER.debug("Database created. {:}".format(self.db_engine.url)) - sqlalchemy_utils.create_database(self.db_engine.url) - - def drop_database(self) -> None: - if sqlalchemy_utils.database_exists(self.db_engine.url): - sqlalchemy_utils.drop_database(self.db_engine.url) - - def create_tables(self): - try: - AnalyzerModel.metadata.create_all(self.db_engine) # type: ignore - LOGGER.debug("Tables created in the database: {:}".format(self.db_name)) - except Exception as e: - LOGGER.debug("Tables cannot be created in the database. {:s}".format(str(e))) - raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)]) - - def verify_tables(self): - try: - inspect_object = inspect(self.db_engine) - if(inspect_object.has_table('analyzer', None)): - LOGGER.info("Table exists in DB: {:}".format(self.db_name)) - except Exception as e: - LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) - -# ----------------- CURD OPERATIONS --------------------- - - def add_row_to_db(self, row): - session = self.Session() - try: - session.add(row) - session.commit() - LOGGER.debug(f"Row inserted into {row.__class__.__name__} table.") - return True - except Exception as e: - session.rollback() - if "psycopg2.errors.UniqueViolation" in str(e): - LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}") - raise AlreadyExistsException(row.__class__.__name__, row, - extra_details=["Unique key voilation: {:}".format(e)] ) - else: - LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}") - raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) - finally: - session.close() - - def search_db_row_by_id(self, model, col_name, id_to_search): - session = self.Session() - try: - entity = session.query(model).filter_by(**{col_name: id_to_search}).first() - if entity: - # LOGGER.debug(f"{model.__name__} ID found: {str(entity)}") - return entity - else: - LOGGER.debug(f"{model.__name__} ID not found, No matching row: {str(id_to_search)}") - print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search)) - return None - except Exception as e: - session.rollback() - LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}") - raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)]) - finally: - session.close() - - def delete_db_row_by_id(self, model, col_name, id_to_search): - session = self.Session() - try: - record = session.query(model).filter_by(**{col_name: id_to_search}).first() - if record: - session.delete(record) - session.commit() - LOGGER.debug("Deleted %s with %s: %s", model.__name__, col_name, id_to_search) - else: - LOGGER.debug("%s with %s %s not found", model.__name__, col_name, id_to_search) - return None - except Exception as e: - session.rollback() - LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e) - raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) - finally: - session.close() - def select_with_filter(self, model, filter_object): + """ + Generic method to create filters dynamically based on filter_object attributes. + params: model: SQLAlchemy model class to query. + filter_object: Object that contains filtering criteria as attributes. + return: SQLAlchemy session, query and Model + """ session = self.Session() try: - query = session.query(AnalyzerModel) - + query = session.query(Model) # Apply filters based on the filter_object if filter_object.analyzer_id: - query = query.filter(AnalyzerModel.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) + query = query.filter(Model.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) if filter_object.algorithm_names: - query = query.filter(AnalyzerModel.algorithm_name.in_(filter_object.algorithm_names)) + query = query.filter(Model.algorithm_name.in_(filter_object.algorithm_names)) if filter_object.input_kpi_ids: input_kpi_uuids = [k.kpi_id.uuid for k in filter_object.input_kpi_ids] - query = query.filter(AnalyzerModel.input_kpi_ids.op('&&')(input_kpi_uuids)) + query = query.filter(Model.input_kpi_ids.op('&&')(input_kpi_uuids)) if filter_object.output_kpi_ids: output_kpi_uuids = [k.kpi_id.uuid for k in filter_object.output_kpi_ids] - query = query.filter(AnalyzerModel.output_kpi_ids.op('&&')(output_kpi_uuids)) - - result = query.all() - # query should be added to return all rows - if result: - LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result} - else: - LOGGER.warning(f"No matching row found in {model.__name__} table with filters: {filter_object}") - return result + query = query.filter(Model.output_kpi_ids.op('&&')(output_kpi_uuids)) except Exception as e: - LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}") - raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)]) - finally: - session.close() + LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") + raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) + + return super().select_with_filter(query, session, Model) \ No newline at end of file diff --git a/src/analytics/database/new_Analyzer_DB.py b/src/analytics/database/new_Analyzer_DB.py deleted file mode 100644 index 3b3a74641..000000000 --- a/src/analytics/database/new_Analyzer_DB.py +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging - -from common.method_wrappers.Decorator import MetricsPool -from .AnalyzerModel import Analyzer as Model -from common.tools.database.GenericDatabase import Database -from common.method_wrappers.ServiceExceptions import OperationFailedException - -LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('KpiManager', 'Database') - -class AnalyzerDB(Database): - def __init__(self) -> None: - LOGGER.info('Init KpiManagerService') - super().__init__("tfs_kpi_mgmt", Model) - - def add_row_to_db(self, row): - return super().add_row_to_db(row) - - def search_db_row_by_id(self, model, col_name, id_to_search): - return super().search_db_row_by_id(model, col_name, id_to_search) - - def delete_db_row_by_id(self, model, col_name, id_to_search): - return super().delete_db_row_by_id(model, col_name, id_to_search) - - def select_with_filter(self, model, filter_object): - """ - Generic method to create filters dynamically based on filter_object attributes. - params: model: SQLAlchemy model class to query. - filter_object: Object that contains filtering criteria as attributes. - return: SQLAlchemy session, query and Model - """ - session = self.Session() - try: - query = session.query(Model) - # Apply filters based on the filter_object - if filter_object.analyzer_id: - query = query.filter(Model.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) - - if filter_object.algorithm_names: - query = query.filter(Model.algorithm_name.in_(filter_object.algorithm_names)) - - if filter_object.input_kpi_ids: - input_kpi_uuids = [k.kpi_id.uuid for k in filter_object.input_kpi_ids] - query = query.filter(Model.input_kpi_ids.op('&&')(input_kpi_uuids)) - - if filter_object.output_kpi_ids: - output_kpi_uuids = [k.kpi_id.uuid for k in filter_object.output_kpi_ids] - query = query.filter(Model.output_kpi_ids.op('&&')(output_kpi_uuids)) - except Exception as e: - LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") - raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) - - return super().select_with_filter(query, session, Model) \ No newline at end of file diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index 5ea0e2ef6..ab274ba36 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -25,7 +25,7 @@ from common.proto.context_pb2 import Empty from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer -from analytics.database. new_Analyzer_DB import AnalyzerDB +from analytics.database.Analyzer_DB import AnalyzerDB from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger diff --git a/src/kpi_manager/database/new_KpiDB.py b/src/kpi_manager/database/KpiDB.py similarity index 87% rename from src/kpi_manager/database/new_KpiDB.py rename to src/kpi_manager/database/KpiDB.py index 8aa1cd20b..cd1b20e19 100644 --- a/src/kpi_manager/database/new_KpiDB.py +++ b/src/kpi_manager/database/KpiDB.py @@ -13,28 +13,21 @@ # limitations under the License. import logging +from common.Settings import get_setting from common.method_wrappers.Decorator import MetricsPool from .KpiModel import Kpi as Model from common.tools.database.GenericDatabase import Database from common.method_wrappers.ServiceExceptions import OperationFailedException -LOGGER = logging.getLogger(__name__) +LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('KpiManager', 'Database') +DB_NAME = get_setting('CRDB_DATABASE', default=None) class KpiDB(Database): def __init__(self) -> None: LOGGER.info('Init KpiManagerService') - super().__init__("tfs_kpi_mgmt", Model) - - def add_row_to_db(self, row): - return super().add_row_to_db(row) - - def search_db_row_by_id(self, model, col_name, id_to_search): - return super().search_db_row_by_id(model, col_name, id_to_search) - - def delete_db_row_by_id(self, model, col_name, id_to_search): - return super().delete_db_row_by_id(model, col_name, id_to_search) + super().__init__(DB_NAME, Model) def select_with_filter(self, model, filter_object): """ diff --git a/src/kpi_manager/database/KpiEngine.py b/src/kpi_manager/database/KpiEngine.py deleted file mode 100644 index 0fce7e3d3..000000000 --- a/src/kpi_manager/database/KpiEngine.py +++ /dev/null @@ -1,40 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging, sqlalchemy -from common.Settings import get_setting - -LOGGER = logging.getLogger(__name__) -CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}' - -class KpiEngine: - @staticmethod - def get_engine() -> sqlalchemy.engine.Engine: - crdb_uri = get_setting('CRDB_URI', default=None) - if crdb_uri is None: - CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE') - CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT') - CRDB_DATABASE = 'tfs_kpi_mgmt' # TODO: define variable get_setting('CRDB_DATABASE_KPI_MGMT') - CRDB_USERNAME = get_setting('CRDB_USERNAME') - CRDB_PASSWORD = get_setting('CRDB_PASSWORD') - CRDB_SSLMODE = get_setting('CRDB_SSLMODE') - crdb_uri = CRDB_URI_TEMPLATE.format( - CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) - try: - engine = sqlalchemy.create_engine(crdb_uri, echo=False) - LOGGER.info(' KpiDBmanager initalized with DB URL: {:}'.format(crdb_uri)) - except: # pylint: disable=bare-except # pragma: no cover - LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) - return None # type: ignore - return engine diff --git a/src/kpi_manager/database/Kpi_DB.py b/src/kpi_manager/database/Kpi_DB.py deleted file mode 100644 index 49ad9c9b5..000000000 --- a/src/kpi_manager/database/Kpi_DB.py +++ /dev/null @@ -1,154 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import sqlalchemy_utils -from sqlalchemy.orm import sessionmaker -from kpi_manager.database.KpiEngine import KpiEngine -from kpi_manager.database.KpiModel import Kpi as KpiModel -from common.method_wrappers.ServiceExceptions import ( - AlreadyExistsException, OperationFailedException , NotFoundException) - -LOGGER = logging.getLogger(__name__) -DB_NAME = "tfs_kpi_mgmt" - -class KpiDB: - def __init__(self): - self.db_engine = KpiEngine.get_engine() - if self.db_engine is None: - LOGGER.error('Unable to get SQLAlchemy DB Engine...') - return False - self.db_name = DB_NAME - self.Session = sessionmaker(bind=self.db_engine) - - def create_database(self) -> None: - if not sqlalchemy_utils.database_exists(self.db_engine.url): - sqlalchemy_utils.create_database(self.db_engine.url) - LOGGER.debug("Database created. {:}".format(self.db_engine.url)) - - def drop_database(self) -> None: - if sqlalchemy_utils.database_exists(self.db_engine.url): - sqlalchemy_utils.drop_database(self.db_engine.url) - - def create_tables(self): - # TODO: use "get_tables(declatrative class obj)" method of "sqlalchemy_utils" to verify tables. - try: - KpiModel.metadata.create_all(self.db_engine) # type: ignore - LOGGER.debug("Tables created in the DB Name: {:}".format(self.db_name)) - except Exception as e: - LOGGER.debug("Tables cannot be created in the kpi database. {:s}".format(str(e))) - raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)]) - - def verify_tables(self): - try: - with self.db_engine.connect() as connection: - result = connection.execute("SHOW TABLES;") - tables = result.fetchall() # type: ignore - LOGGER.debug("Tables verified: {:}".format(tables)) - except Exception as e: - LOGGER.debug("Unable to fetch Table names. {:s}".format(str(e))) - - def add_row_to_db(self, row): - session = self.Session() - try: - session.add(row) - session.commit() - LOGGER.debug(f"Row inserted into {row.__class__.__name__} table.") - return True - except Exception as e: - session.rollback() - if "psycopg2.errors.UniqueViolation" in str(e): - LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}") - raise AlreadyExistsException(row.__class__.__name__, row, extra_details=["Unique key voilation: {:}".format(e)] ) - else: - LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}") - raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) - finally: - session.close() - - def search_db_row_by_id(self, model, col_name, id_to_search): - session = self.Session() - try: - entity = session.query(model).filter_by(**{col_name: id_to_search}).first() - if entity: - # LOGGER.debug(f"{model.__name__} ID found: {str(entity)}") - return entity - else: - LOGGER.debug(f"{model.__name__} ID not found, No matching row: {str(id_to_search)}") - print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search)) - return None - except Exception as e: - LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}") - raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)]) - finally: - session.close() - - def delete_db_row_by_id(self, model, col_name, id_to_search): - session = self.Session() - try: - record = session.query(model).filter_by(**{col_name: id_to_search}).first() - if record: - session.delete(record) - session.commit() - LOGGER.debug("Deleted %s with %s: %s", model.__name__, col_name, id_to_search) - else: - LOGGER.debug("%s with %s %s not found", model.__name__, col_name, id_to_search) - return None - except Exception as e: - session.rollback() - LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e) - raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) - finally: - session.close() - - def select_with_filter(self, model, filter_object): - session = self.Session() - try: - query = session.query(KpiModel) - # Apply filters based on the filter_object - if filter_object.kpi_id: - query = query.filter(KpiModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) - - if filter_object.kpi_sample_type: - query = query.filter(KpiModel.kpi_sample_type.in_(filter_object.kpi_sample_type)) - - if filter_object.device_id: - query = query.filter(KpiModel.device_id.in_([d.device_uuid.uuid for d in filter_object.device_id])) - - if filter_object.endpoint_id: - query = query.filter(KpiModel.endpoint_id.in_([e.endpoint_uuid.uuid for e in filter_object.endpoint_id])) - - if filter_object.service_id: - query = query.filter(KpiModel.service_id.in_([s.service_uuid.uuid for s in filter_object.service_id])) - - if filter_object.slice_id: - query = query.filter(KpiModel.slice_id.in_([s.slice_uuid.uuid for s in filter_object.slice_id])) - - if filter_object.connection_id: - query = query.filter(KpiModel.connection_id.in_([c.connection_uuid.uuid for c in filter_object.connection_id])) - - if filter_object.link_id: - query = query.filter(KpiModel.link_id.in_([l.link_uuid.uuid for l in filter_object.link_id])) - result = query.all() - - if result: - LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result} - else: - LOGGER.debug(f"No matching row found in {model.__name__} table with filters: {filter_object}") - return result - except Exception as e: - LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}") - raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)]) - finally: - session.close() diff --git a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py index 39bc9e0b0..57acf3afe 100644 --- a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py +++ b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py @@ -19,7 +19,7 @@ from common.proto.context_pb2 import Empty from common.proto.kpi_manager_pb2_grpc import KpiManagerServiceServicer from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList # from kpi_manager.database.Kpi_DB import KpiDB -from kpi_manager.database.new_KpiDB import KpiDB +from kpi_manager.database.KpiDB import KpiDB from kpi_manager.database.KpiModel import Kpi as KpiModel LOGGER = logging.getLogger(__name__) diff --git a/src/telemetry/database/TelemetryEngine.py b/src/telemetry/database/TelemetryEngine.py deleted file mode 100644 index 7c8620faf..000000000 --- a/src/telemetry/database/TelemetryEngine.py +++ /dev/null @@ -1,40 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging, sqlalchemy -from common.Settings import get_setting - -LOGGER = logging.getLogger(__name__) -CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}' - -class TelemetryEngine: - @staticmethod - def get_engine() -> sqlalchemy.engine.Engine: - crdb_uri = get_setting('CRDB_URI', default=None) - if crdb_uri is None: - CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE') - CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT') - CRDB_DATABASE = "tfs-telemetry" # TODO: define variable get_setting('CRDB_DATABASE_KPI_MGMT') - CRDB_USERNAME = get_setting('CRDB_USERNAME') - CRDB_PASSWORD = get_setting('CRDB_PASSWORD') - CRDB_SSLMODE = get_setting('CRDB_SSLMODE') - crdb_uri = CRDB_URI_TEMPLATE.format( - CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) - try: - engine = sqlalchemy.create_engine(crdb_uri, echo=False) - LOGGER.info(' TelemetryDB initalized with DB URL: {:}'.format(crdb_uri)) - except: # pylint: disable=bare-except # pragma: no cover - LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) - return None # type: ignore - return engine diff --git a/src/telemetry/database/Telemetry_DB.py b/src/telemetry/database/Telemetry_DB.py index 32acfd73a..66cbfe77f 100644 --- a/src/telemetry/database/Telemetry_DB.py +++ b/src/telemetry/database/Telemetry_DB.py @@ -13,125 +13,35 @@ # limitations under the License. import logging -import sqlalchemy_utils -from sqlalchemy import inspect -from sqlalchemy.orm import sessionmaker -from telemetry.database.TelemetryModel import Collector as CollectorModel -from telemetry.database.TelemetryEngine import TelemetryEngine -from common.method_wrappers.ServiceExceptions import ( - OperationFailedException, AlreadyExistsException ) - -LOGGER = logging.getLogger(__name__) -DB_NAME = "tfs_telemetry" - -class TelemetryDB: - def __init__(self): - self.db_engine = TelemetryEngine.get_engine() - if self.db_engine is None: - LOGGER.error('Unable to get SQLAlchemy DB Engine...') - return False - self.db_name = DB_NAME - self.Session = sessionmaker(bind=self.db_engine) - - def create_database(self): - if not sqlalchemy_utils.database_exists(self.db_engine.url): - LOGGER.debug("Database created. {:}".format(self.db_engine.url)) - sqlalchemy_utils.create_database(self.db_engine.url) - - def drop_database(self) -> None: - if sqlalchemy_utils.database_exists(self.db_engine.url): - sqlalchemy_utils.drop_database(self.db_engine.url) - - def create_tables(self): - try: - CollectorModel.metadata.create_all(self.db_engine) # type: ignore - LOGGER.debug("Tables created in the database: {:}".format(self.db_name)) - except Exception as e: - LOGGER.debug("Tables cannot be created in the database. {:s}".format(str(e))) - raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)]) - - def verify_tables(self): - try: - inspect_object = inspect(self.db_engine) - if(inspect_object.has_table('collector', None)): - LOGGER.info("Table exists in DB: {:}".format(self.db_name)) - except Exception as e: - LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) - -# ----------------- CURD METHODs --------------------- - - def add_row_to_db(self, row): - session = self.Session() - try: - session.add(row) - session.commit() - LOGGER.debug(f"Row inserted into {row.__class__.__name__} table.") - return True - except Exception as e: - session.rollback() - if "psycopg2.errors.UniqueViolation" in str(e): - LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}") - raise AlreadyExistsException(row.__class__.__name__, row, - extra_details=["Unique key voilation: {:}".format(e)] ) - else: - LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}") - raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) - finally: - session.close() - - def search_db_row_by_id(self, model, col_name, id_to_search): - session = self.Session() - try: - entity = session.query(model).filter_by(**{col_name: id_to_search}).first() - if entity: - # LOGGER.debug(f"{model.__name__} ID found: {str(entity)}") - return entity - else: - LOGGER.debug(f"{model.__name__} ID not found, No matching row: {str(id_to_search)}") - print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search)) - return None - except Exception as e: - session.rollback() - LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}") - raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)]) - finally: - session.close() - - def delete_db_row_by_id(self, model, col_name, id_to_search): - session = self.Session() - try: - record = session.query(model).filter_by(**{col_name: id_to_search}).first() - if record: - session.delete(record) - session.commit() - LOGGER.debug("Deleted %s with %s: %s", model.__name__, col_name, id_to_search) - else: - LOGGER.debug("%s with %s %s not found", model.__name__, col_name, id_to_search) - return None - except Exception as e: - session.rollback() - LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e) - raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) - finally: - session.close() +from common.Settings import get_setting +from common.method_wrappers.Decorator import MetricsPool +from .TelemetryModel import Collector as Model +from common.tools.database.GenericDatabase import Database +from common.method_wrappers.ServiceExceptions import OperationFailedException + +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('TelemteryFrontend', 'Database') +DB_NAME = get_setting('CRDB_DATABASE', default=None) + +class TelemetryDB(Database): + def __init__(self) -> None: + LOGGER.info('Init KpiManagerService') + super().__init__(DB_NAME, Model) def select_with_filter(self, model, filter_object): + """ + Generic method to create filters dynamically based on filter_object attributes. + params: model: SQLAlchemy model class to query. + filter_object: Object that contains filtering criteria as attributes. + return: SQLAlchemy session, query and Model + """ session = self.Session() try: - query = session.query(CollectorModel) - # Apply filters based on the filter_object + query = session.query(Model) if filter_object.kpi_id: - query = query.filter(CollectorModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) - result = query.all() - # query should be added to return all rows - if result: - LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result} - else: - LOGGER.warning(f"No matching row found in {model.__name__} table with filters: {filter_object}") - return result + query = query.filter(Model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) except Exception as e: - LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}") - raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)]) - finally: - session.close() - + LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") + raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) + + return super().select_with_filter(query, session, Model) diff --git a/src/telemetry/database/new_Telemetry_DB.py b/src/telemetry/database/new_Telemetry_DB.py deleted file mode 100644 index bd9d863d2..000000000 --- a/src/telemetry/database/new_Telemetry_DB.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging - -from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method -from .TelemetryModel import Collector as Model -from common.tools.database.GenericDatabase import Database -from common.method_wrappers.ServiceExceptions import OperationFailedException - -LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('TelemteryFrontend', 'Database') - -class TelemetryDB(Database): - def __init__(self) -> None: - LOGGER.info('Init KpiManagerService') - super().__init__("tfs_telemetry", Model) - - def add_row_to_db(self, row): - return super().add_row_to_db(row) - - def search_db_row_by_id(self, model, col_name, id_to_search): - return super().search_db_row_by_id(model, col_name, id_to_search) - - def delete_db_row_by_id(self, model, col_name, id_to_search): - return super().delete_db_row_by_id(model, col_name, id_to_search) - - def select_with_filter(self, model, filter_object): - """ - Generic method to create filters dynamically based on filter_object attributes. - params: model: SQLAlchemy model class to query. - filter_object: Object that contains filtering criteria as attributes. - return: SQLAlchemy session, query and Model - """ - session = self.Session() - try: - query = session.query(Model) - # Apply filters based on the filter_object - if filter_object.kpi_id: - query = query.filter(Model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) - # query should be added to return all rows - except Exception as e: - LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") - raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) - - return super().select_with_filter(query, session, Model) diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index 9068de0db..43ac9455e 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -26,7 +26,7 @@ from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceSer from telemetry.database.TelemetryModel import Collector as CollectorModel # from telemetry.database.Telemetry_DB import TelemetryDB -from ...database.new_Telemetry_DB import TelemetryDB +from ...database.Telemetry_DB import TelemetryDB from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Producer as KafkaProducer -- GitLab From 0306b222f8e25768b0a934ae4d49e7f47e9be177 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Mon, 30 Sep 2024 16:14:32 +0000 Subject: [PATCH 07/13] Changes to Address Minor Issues in Monitoring Services Deployment and Build: - Improved Kafka scripts. - Updated manifests to handle ports for backend and frontend. - Changed the return type of a method in settings to `int`. - Added DB deployment in the service's main file. --- deploy/kafka.sh | 14 +- manifests/analyticsservice.yaml | 12 +- manifests/servicemonitors.yaml | 153 +++++++++++++++++++++ manifests/telemetryservice.yaml | 12 +- src/analytics/backend/service/__main__.py | 2 +- src/analytics/frontend/service/__main__.py | 14 +- src/analytics/tests/test_analytics_db.py | 2 +- src/common/Settings.py | 10 +- src/common/tools/kafka/Variables.py | 10 +- src/kpi_manager/service/__main__.py | 13 +- src/kpi_manager/tests/test_kpi_db.py | 2 +- src/telemetry/backend/service/__main__.py | 2 +- src/telemetry/frontend/service/__main__.py | 12 +- src/telemetry/tests/test_telemetryDB.py | 2 +- 14 files changed, 230 insertions(+), 30 deletions(-) diff --git a/deploy/kafka.sh b/deploy/kafka.sh index 0483bce15..4cbcdb701 100755 --- a/deploy/kafka.sh +++ b/deploy/kafka.sh @@ -47,10 +47,10 @@ function kafka_deploy() { cp "${KFK_MANIFESTS_PATH}/${KFK_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_MANIFEST}" # echo "Apache Kafka Namespace" - echo ">>> Delete Apache Kafka Namespace" + echo "Delete Apache Kafka Namespace" kubectl delete namespace ${KFK_NAMESPACE} --ignore-not-found - echo ">>> Create Apache Kafka Namespace" + echo "Create Apache Kafka Namespace" kubectl create namespace ${KFK_NAMESPACE} # echo ">>> Deplying Apache Kafka Zookeeper" @@ -76,15 +76,15 @@ function kafka_deploy() { # fi } -echo "Apache Kafka" -echo ">>> Checking if Apache Kafka is deployed ... " +echo ">>> Apache Kafka" +echo "Checking if Apache Kafka is deployed ... " if [ "$KFK_REDEPLOY" == "YES" ]; then - echo ">>> Redeploying kafka namespace" + echo "Redeploying kafka namespace" kafka_deploy elif kubectl get namespace "${KFK_NAMESPACE}" &> /dev/null; then - echo ">>> Apache Kafka already present; skipping step." + echo "Apache Kafka already present; skipping step." else - echo ">>> Kafka namespace doesn't exists. Deploying kafka namespace" + echo "Kafka namespace doesn't exists. Deploying kafka namespace" kafka_deploy fi echo diff --git a/manifests/analyticsservice.yaml b/manifests/analyticsservice.yaml index 7340dff5f..e15214dbd 100644 --- a/manifests/analyticsservice.yaml +++ b/manifests/analyticsservice.yaml @@ -39,6 +39,8 @@ spec: value: "INFO" - name: CRDB_DATABASE value: "tfs_analytics" + - name: METRICS_PORT + value: "9192" envFrom: - secretRef: name: crdb-analytics @@ -62,10 +64,12 @@ spec: imagePullPolicy: Always ports: - containerPort: 30090 - - containerPort: 9192 + - containerPort: 9193 env: - name: LOG_LEVEL value: "INFO" + - name: METRICS_PORT + value: "9193" envFrom: - secretRef: name: kfk-kpi-data @@ -102,10 +106,14 @@ spec: protocol: TCP port: 30090 targetPort: 30090 - - name: metrics + - name: metrics-frontend protocol: TCP port: 9192 targetPort: 9192 + - name: metrics-backend + protocol: TCP + port: 9193 + targetPort: 9193 --- apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler diff --git a/manifests/servicemonitors.yaml b/manifests/servicemonitors.yaml index 716c1c689..8a8fe6f39 100644 --- a/manifests/servicemonitors.yaml +++ b/manifests/servicemonitors.yaml @@ -475,3 +475,156 @@ spec: any: false matchNames: - tfs # namespace where the app is running +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + namespace: monitoring # namespace where prometheus is running + name: tfs-analyticsservice-metric + labels: + app: analyticsservice + #release: prometheus + #release: prom # name of the release + # ( VERY IMPORTANT: You need to know the correct release name by viewing + # the servicemonitor of Prometheus itself: Without the correct name, + # Prometheus cannot identify the metrics of the Flask app as the target.) +spec: + selector: + matchLabels: + # Target app service + #namespace: tfs + app: analyticsservice # same as above + #release: prometheus # same as above + endpoints: + - port: metrics-frontend # named port in target app + scheme: http + path: /metrics # path to scrape + interval: 5s # scrape interval + - port: metrics-backend # named port in target app + scheme: http + path: /metrics # path to scrape + interval: 5s # scrape interval + namespaceSelector: + any: false + matchNames: + - tfs # namespace where the app is running +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + namespace: monitoring # namespace where prometheus is running + name: tfs-telemetryservice-metric + labels: + app: telemetryservice + #release: prometheus + #release: prom # name of the release + # ( VERY IMPORTANT: You need to know the correct release name by viewing + # the servicemonitor of Prometheus itself: Without the correct name, + # Prometheus cannot identify the metrics of the Flask app as the target.) +spec: + selector: + matchLabels: + # Target app service + #namespace: tfs + app: telemetryservice # same as above + #release: prometheus # same as above + endpoints: + - port: metrics-frontend # named port in target app + scheme: http + path: /metrics # path to scrape + interval: 5s # scrape interval + - port: metrics-backend # named port in target app + scheme: http + path: /metrics # path to scrape + interval: 5s # scrape interval + namespaceSelector: + any: false + matchNames: + - tfs # namespace where the app is running +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + namespace: monitoring # namespace where prometheus is running + name: tfs-kpi-managerservice-metric + labels: + app: kpi-managerservice + #release: prometheus + #release: prom # name of the release + # ( VERY IMPORTANT: You need to know the correct release name by viewing + # the servicemonitor of Prometheus itself: Without the correct name, + # Prometheus cannot identify the metrics of the Flask app as the target.) +spec: + selector: + matchLabels: + # Target app service + #namespace: tfs + app: kpi-managerservice # same as above + #release: prometheus # same as above + endpoints: + - port: metrics # named port in target app + scheme: http + path: /metrics # path to scrape + interval: 5s # scrape interval + namespaceSelector: + any: false + matchNames: + - tfs # namespace where the app is running +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + namespace: monitoring # namespace where prometheus is running + name: tfs-kpi_value_apiservice-metric + labels: + app: kpi_value_apiservice + #release: prometheus + #release: prom # name of the release + # ( VERY IMPORTANT: You need to know the correct release name by viewing + # the servicemonitor of Prometheus itself: Without the correct name, + # Prometheus cannot identify the metrics of the Flask app as the target.) +spec: + selector: + matchLabels: + # Target app service + #namespace: tfs + app: kpi_value_apiservice # same as above + #release: prometheus # same as above + endpoints: + - port: metrics # named port in target app + scheme: http + path: /metrics # path to scrape + interval: 5s # scrape interval + namespaceSelector: + any: false + matchNames: + - tfs # namespace where the app is running +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + namespace: monitoring # namespace where prometheus is running + name: tfs-kpi_value_writerservice-metric + labels: + app: kpi_value_writerservice + #release: prometheus + #release: prom # name of the release + # ( VERY IMPORTANT: You need to know the correct release name by viewing + # the servicemonitor of Prometheus itself: Without the correct name, + # Prometheus cannot identify the metrics of the Flask app as the target.) +spec: + selector: + matchLabels: + # Target app service + #namespace: tfs + app: kpi_value_writerservice # same as above + #release: prometheus # same as above + endpoints: + - port: metrics # named port in target app + scheme: http + path: /metrics # path to scrape + interval: 5s # scrape interval + namespaceSelector: + any: false + matchNames: + - tfs # namespace where the app is running diff --git a/manifests/telemetryservice.yaml b/manifests/telemetryservice.yaml index 7c781bb3d..9c979713d 100644 --- a/manifests/telemetryservice.yaml +++ b/manifests/telemetryservice.yaml @@ -39,6 +39,8 @@ spec: value: "INFO" - name: CRDB_DATABASE value: "tfs_kpi" + - name: METRICS_PORT + value: "9192" envFrom: - secretRef: name: crdb-telemetry @@ -62,10 +64,12 @@ spec: imagePullPolicy: Always ports: - containerPort: 30060 - - containerPort: 9192 + - containerPort: 9193 env: - name: LOG_LEVEL value: "INFO" + - name: METRICS_PORT + value: "9193" envFrom: - secretRef: name: kfk-kpi-data @@ -102,10 +106,14 @@ spec: protocol: TCP port: 30060 targetPort: 30060 - - name: metrics + - name: metrics-frontend protocol: TCP port: 9192 targetPort: 9192 + - name: metrics-backend + protocol: TCP + port: 9193 + targetPort: 9193 --- apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler diff --git a/src/analytics/backend/service/__main__.py b/src/analytics/backend/service/__main__.py index 3c4c36b7c..9b1941e20 100644 --- a/src/analytics/backend/service/__main__.py +++ b/src/analytics/backend/service/__main__.py @@ -37,7 +37,7 @@ def main(): LOGGER.info('Starting...') # Start metrics server - metrics_port = get_metrics_port() + metrics_port = int(get_metrics_port()) start_http_server(metrics_port) grpc_service = AnalyticsBackendService() diff --git a/src/analytics/frontend/service/__main__.py b/src/analytics/frontend/service/__main__.py index 6c331844f..bf94eb5e6 100644 --- a/src/analytics/frontend/service/__main__.py +++ b/src/analytics/frontend/service/__main__.py @@ -16,9 +16,14 @@ import logging, signal, sys, threading from prometheus_client import start_http_server from common.Settings import get_log_level, get_metrics_port from .AnalyticsFrontendService import AnalyticsFrontendService +from analytics.database.AnalyzerModel import Analyzer as Model +from common.tools.database.GenericDatabase import Database +from common.Settings import get_setting + +DB_NAME = get_setting('CRDB_DATABASE', default=None) terminate = threading.Event() -LOGGER = None +LOGGER = None def signal_handler(signal, frame): # pylint: disable=redefined-outer-name LOGGER.warning('Terminate signal received') @@ -36,8 +41,13 @@ def main(): LOGGER.info('Starting...') + # To create DB + kpiDBobj = Database(DB_NAME, Model) + kpiDBobj.create_database() + kpiDBobj.create_tables() + # Start metrics server - metrics_port = get_metrics_port() + metrics_port = int(get_metrics_port()) start_http_server(metrics_port) grpc_service = AnalyticsFrontendService() diff --git a/src/analytics/tests/test_analytics_db.py b/src/analytics/tests/test_analytics_db.py index f944fc0b5..58e7d0167 100644 --- a/src/analytics/tests/test_analytics_db.py +++ b/src/analytics/tests/test_analytics_db.py @@ -14,7 +14,7 @@ import logging -from analytics.database.new_Analyzer_DB import AnalyzerDB +from analytics.database.Analyzer_DB import AnalyzerDB LOGGER = logging.getLogger(__name__) diff --git a/src/common/Settings.py b/src/common/Settings.py index eaeb363ad..936c0387b 100644 --- a/src/common/Settings.py +++ b/src/common/Settings.py @@ -79,12 +79,12 @@ def get_service_host(service_name : ServiceNameEnum): def get_service_port_grpc(service_name : ServiceNameEnum): envvar_name = get_env_var_name(service_name, ENVVAR_SUFIX_SERVICE_PORT_GRPC) default_value = DEFAULT_SERVICE_GRPC_PORTS.get(service_name.value) - return get_setting(envvar_name, default=default_value) + return int(get_setting(envvar_name, default=default_value)) def get_service_port_http(service_name : ServiceNameEnum): envvar_name = get_env_var_name(service_name, ENVVAR_SUFIX_SERVICE_PORT_HTTP) default_value = DEFAULT_SERVICE_HTTP_PORTS.get(service_name.value) - return get_setting(envvar_name, default=default_value) + return int(get_setting(envvar_name, default=default_value)) def get_service_baseurl_http(service_name : ServiceNameEnum): envvar_name = get_env_var_name(service_name, ENVVAR_SUFIX_SERVICE_BASEURL_HTTP) @@ -95,16 +95,16 @@ def get_log_level(): return get_setting(ENVVAR_LOG_LEVEL, default=DEFAULT_LOG_LEVEL) def get_metrics_port(): - return get_setting(ENVVAR_METRICS_PORT, default=DEFAULT_METRICS_PORT) + return int(get_setting(ENVVAR_METRICS_PORT, default=DEFAULT_METRICS_PORT)) def get_grpc_bind_address(): return get_setting(ENVVAR_GRPC_BIND_ADDRESS, default=DEFAULT_GRPC_BIND_ADDRESS) def get_grpc_max_workers(): - return get_setting(ENVVAR_GRPC_MAX_WORKERS, default=DEFAULT_GRPC_MAX_WORKERS) + return int(get_setting(ENVVAR_GRPC_MAX_WORKERS, default=DEFAULT_GRPC_MAX_WORKERS)) def get_grpc_grace_period(): - return get_setting(ENVVAR_GRPC_GRACE_PERIOD, default=DEFAULT_GRPC_GRACE_PERIOD) + return int(get_setting(ENVVAR_GRPC_GRACE_PERIOD, default=DEFAULT_GRPC_GRACE_PERIOD)) def get_http_bind_address(): return get_setting(ENVVAR_HTTP_BIND_ADDRESS, default=DEFAULT_HTTP_BIND_ADDRESS) diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index fc43c3151..73b633e23 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -25,11 +25,11 @@ class KafkaConfig(Enum): @staticmethod def get_kafka_address() -> str: - # kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None) - # if kafka_server_address is None: - KFK_NAMESPACE = get_setting('KFK_NAMESPACE') - KFK_PORT = get_setting('KFK_SERVER_PORT') - kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) + kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None) + if kafka_server_address is None: + KFK_NAMESPACE = get_setting('KFK_NAMESPACE') + KFK_PORT = get_setting('KFK_SERVER_PORT') + kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) return kafka_server_address @staticmethod diff --git a/src/kpi_manager/service/__main__.py b/src/kpi_manager/service/__main__.py index 244d5afa3..6a3d078e2 100644 --- a/src/kpi_manager/service/__main__.py +++ b/src/kpi_manager/service/__main__.py @@ -16,8 +16,14 @@ import logging, signal, sys, threading from common.Settings import get_log_level from .KpiManagerService import KpiManagerService +from kpi_manager.database.KpiModel import Kpi as Model +from common.tools.database.GenericDatabase import Database +from common.Settings import get_setting + + +DB_NAME = get_setting('CRDB_DATABASE', default=None) terminate = threading.Event() -LOGGER = None +LOGGER = None def signal_handler(signal, frame): # pylint: disable=redefined-outer-name LOGGER.warning('Terminate signal received') @@ -35,6 +41,11 @@ def main(): LOGGER.debug('Starting...') + # To create DB + kpiDBobj = Database(DB_NAME, Model) + kpiDBobj.create_database() + kpiDBobj.create_tables() + grpc_service = KpiManagerService() grpc_service.start() diff --git a/src/kpi_manager/tests/test_kpi_db.py b/src/kpi_manager/tests/test_kpi_db.py index b9c3b2d7c..b1513a83f 100644 --- a/src/kpi_manager/tests/test_kpi_db.py +++ b/src/kpi_manager/tests/test_kpi_db.py @@ -18,7 +18,7 @@ import logging from common.proto.kpi_manager_pb2 import KpiDescriptorList from .test_messages import create_kpi_filter_request from kpi_manager.database.KpiModel import Kpi as KpiModel -from kpi_manager.database.new_KpiDB import KpiDB +from kpi_manager.database.KpiDB import KpiDB # from common.tools.database.GenericDatabase import Database LOGGER = logging.getLogger(__name__) diff --git a/src/telemetry/backend/service/__main__.py b/src/telemetry/backend/service/__main__.py index 9ec9e191f..8af209e2c 100644 --- a/src/telemetry/backend/service/__main__.py +++ b/src/telemetry/backend/service/__main__.py @@ -37,7 +37,7 @@ def main(): LOGGER.info('Starting...') # Start metrics server - metrics_port = get_metrics_port() + metrics_port = int(get_metrics_port()) start_http_server(metrics_port) grpc_service = TelemetryBackendService() diff --git a/src/telemetry/frontend/service/__main__.py b/src/telemetry/frontend/service/__main__.py index 2a6c5dbcf..60b96646a 100644 --- a/src/telemetry/frontend/service/__main__.py +++ b/src/telemetry/frontend/service/__main__.py @@ -16,7 +16,12 @@ import logging, signal, sys, threading from prometheus_client import start_http_server from common.Settings import get_log_level, get_metrics_port from .TelemetryFrontendService import TelemetryFrontendService +from telemetry.database.TelemetryModel import Collector as Model +from common.tools.database.GenericDatabase import Database +from common.Settings import get_setting + +DB_NAME = get_setting('CRDB_DATABASE', default=None) terminate = threading.Event() LOGGER = None @@ -36,8 +41,13 @@ def main(): LOGGER.info('Starting...') + # To create DB + kpiDBobj = Database(DB_NAME, Model) + kpiDBobj.create_database() + kpiDBobj.create_tables() + # Start metrics server - metrics_port = get_metrics_port() + metrics_port = int(get_metrics_port()) start_http_server(metrics_port) grpc_service = TelemetryFrontendService() diff --git a/src/telemetry/tests/test_telemetryDB.py b/src/telemetry/tests/test_telemetryDB.py index 707e6b5b2..0d5bd0584 100644 --- a/src/telemetry/tests/test_telemetryDB.py +++ b/src/telemetry/tests/test_telemetryDB.py @@ -14,7 +14,7 @@ import logging -from telemetry.database.new_Telemetry_DB import TelemetryDB +from telemetry.database.Telemetry_DB import TelemetryDB LOGGER = logging.getLogger(__name__) -- GitLab From 860333a46fc7b7aa4c839bde4bb858e912d440ec Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Mon, 30 Sep 2024 16:15:55 +0000 Subject: [PATCH 08/13] Commented QKD call in main of service and webui --- .gitlab-ci.yml | 1 + src/service/service/__main__.py | 4 ++-- src/webui/service/__main__.py | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 115b33676..cb6ea273b 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -49,5 +49,6 @@ include: - local: '/src/kpi_value_api/.gitlab-ci.yml' - local: '/src/kpi_value_writer/.gitlab-ci.yml' - local: '/src/telemetry/.gitlab-ci.yml' + - local: '/src/analytics/.gitlab-ci.yml' # This should be last one: end-to-end integration tests - local: '/src/tests/.gitlab-ci.yml' diff --git a/src/service/service/__main__.py b/src/service/service/__main__.py index 5f9f2fa3a..589d3f673 100644 --- a/src/service/service/__main__.py +++ b/src/service/service/__main__.py @@ -44,8 +44,8 @@ def main(): get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC), get_env_var_name(ServiceNameEnum.PATHCOMP, ENVVAR_SUFIX_SERVICE_HOST ), get_env_var_name(ServiceNameEnum.PATHCOMP, ENVVAR_SUFIX_SERVICE_PORT_GRPC), - get_env_var_name(ServiceNameEnum.QKD_APP, ENVVAR_SUFIX_SERVICE_HOST ), - get_env_var_name(ServiceNameEnum.QKD_APP, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + # get_env_var_name(ServiceNameEnum.QKD_APP, ENVVAR_SUFIX_SERVICE_HOST ), + # get_env_var_name(ServiceNameEnum.QKD_APP, ENVVAR_SUFIX_SERVICE_PORT_GRPC), ]) signal.signal(signal.SIGINT, signal_handler) diff --git a/src/webui/service/__main__.py b/src/webui/service/__main__.py index 8ec8dcb64..3c7be3495 100644 --- a/src/webui/service/__main__.py +++ b/src/webui/service/__main__.py @@ -43,8 +43,8 @@ def main(): get_env_var_name(ServiceNameEnum.SERVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC), get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_HOST ), get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC), - get_env_var_name(ServiceNameEnum.QKD_APP, ENVVAR_SUFIX_SERVICE_HOST ), - get_env_var_name(ServiceNameEnum.QKD_APP, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + # get_env_var_name(ServiceNameEnum.QKD_APP, ENVVAR_SUFIX_SERVICE_HOST ), + # get_env_var_name(ServiceNameEnum.QKD_APP, ENVVAR_SUFIX_SERVICE_PORT_GRPC), ]) logger.info('Starting...') -- GitLab From bdfc066bbe56c789240aa0ec3adbb40088b1b847 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Mon, 30 Sep 2024 16:47:18 +0000 Subject: [PATCH 09/13] Moved the Model object from the DB class to the implementation class. --- src/analytics/backend/service/__main__.py | 2 +- src/analytics/database/Analyzer_DB.py | 19 ++++++------- .../AnalyticsFrontendServiceServicerImpl.py | 2 +- src/analytics/frontend/service/__main__.py | 2 +- src/common/tools/database/GenericDatabase.py | 8 ++++-- src/kpi_manager/database/KpiDB.py | 28 ++++++++----------- .../service/KpiManagerServiceServicerImpl.py | 2 +- src/telemetry/backend/service/__main__.py | 2 +- src/telemetry/database/Telemetry_DB.py | 13 ++++----- .../TelemetryFrontendServiceServicerImpl.py | 4 +-- src/telemetry/frontend/service/__main__.py | 2 +- 11 files changed, 38 insertions(+), 46 deletions(-) diff --git a/src/analytics/backend/service/__main__.py b/src/analytics/backend/service/__main__.py index 9b1941e20..3c4c36b7c 100644 --- a/src/analytics/backend/service/__main__.py +++ b/src/analytics/backend/service/__main__.py @@ -37,7 +37,7 @@ def main(): LOGGER.info('Starting...') # Start metrics server - metrics_port = int(get_metrics_port()) + metrics_port = get_metrics_port() start_http_server(metrics_port) grpc_service = AnalyticsBackendService() diff --git a/src/analytics/database/Analyzer_DB.py b/src/analytics/database/Analyzer_DB.py index 8420b66b5..99e9c62f6 100644 --- a/src/analytics/database/Analyzer_DB.py +++ b/src/analytics/database/Analyzer_DB.py @@ -13,20 +13,17 @@ # limitations under the License. import logging -from common.Settings import get_setting from common.method_wrappers.Decorator import MetricsPool -from .AnalyzerModel import Analyzer as Model from common.tools.database.GenericDatabase import Database from common.method_wrappers.ServiceExceptions import OperationFailedException LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('KpiManager', 'Database') -DB_NAME = get_setting('CRDB_DATABASE', default=None) class AnalyzerDB(Database): - def __init__(self) -> None: + def __init__(self, model) -> None: LOGGER.info('Init KpiManagerService') - super().__init__(DB_NAME, Model) + super().__init__(model) def select_with_filter(self, model, filter_object): """ @@ -37,23 +34,23 @@ class AnalyzerDB(Database): """ session = self.Session() try: - query = session.query(Model) + query = session.query(model) # Apply filters based on the filter_object if filter_object.analyzer_id: - query = query.filter(Model.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) + query = query.filter(model.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) if filter_object.algorithm_names: - query = query.filter(Model.algorithm_name.in_(filter_object.algorithm_names)) + query = query.filter(model.algorithm_name.in_(filter_object.algorithm_names)) if filter_object.input_kpi_ids: input_kpi_uuids = [k.kpi_id.uuid for k in filter_object.input_kpi_ids] - query = query.filter(Model.input_kpi_ids.op('&&')(input_kpi_uuids)) + query = query.filter(model.input_kpi_ids.op('&&')(input_kpi_uuids)) if filter_object.output_kpi_ids: output_kpi_uuids = [k.kpi_id.uuid for k in filter_object.output_kpi_ids] - query = query.filter(Model.output_kpi_ids.op('&&')(output_kpi_uuids)) + query = query.filter(model.output_kpi_ids.op('&&')(output_kpi_uuids)) except Exception as e: LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) - return super().select_with_filter(query, session, Model) \ No newline at end of file + return super().select_with_filter(query, session, model) \ No newline at end of file diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index ab274ba36..a7fc8d492 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -37,7 +37,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): def __init__(self): LOGGER.info('Init AnalyticsFrontendService') self.listener_topic = KafkaTopic.ANALYTICS_RESPONSE.value - self.db_obj = AnalyzerDB() + self.db_obj = AnalyzerDB(AnalyzerModel) self.result_queue = queue.Queue() self.scheduler = BackgroundScheduler() self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) diff --git a/src/analytics/frontend/service/__main__.py b/src/analytics/frontend/service/__main__.py index bf94eb5e6..cc324e2dc 100644 --- a/src/analytics/frontend/service/__main__.py +++ b/src/analytics/frontend/service/__main__.py @@ -47,7 +47,7 @@ def main(): kpiDBobj.create_tables() # Start metrics server - metrics_port = int(get_metrics_port()) + metrics_port = get_metrics_port() start_http_server(metrics_port) grpc_service = AnalyticsFrontendService() diff --git a/src/common/tools/database/GenericDatabase.py b/src/common/tools/database/GenericDatabase.py index 7c6453d7c..9868401ec 100644 --- a/src/common/tools/database/GenericDatabase.py +++ b/src/common/tools/database/GenericDatabase.py @@ -18,18 +18,20 @@ import sqlalchemy_utils from .GenericEngine import Engine from sqlalchemy import inspect from sqlalchemy.orm import sessionmaker +from common.Settings import get_setting from common.method_wrappers.ServiceExceptions import (OperationFailedException, AlreadyExistsException) LOGGER = logging.getLogger(__name__) +DB_NAME = get_setting('CRDB_DATABASE', default=None) class Database: - def __init__(self, db_name, model): - self.db_engine = Engine.get_engine(db_name) + def __init__(self, model): + self.db_engine = Engine.get_engine(DB_NAME) if self.db_engine is None: LOGGER.error('Unable to get SQLAlchemy DB Engine...') raise Exception('Failed to initialize the database engine.') - self.db_name = db_name + self.db_name = DB_NAME self.db_model = model self.db_table = model.__name__ self.Session = sessionmaker(bind=self.db_engine) diff --git a/src/kpi_manager/database/KpiDB.py b/src/kpi_manager/database/KpiDB.py index cd1b20e19..d503f06f4 100644 --- a/src/kpi_manager/database/KpiDB.py +++ b/src/kpi_manager/database/KpiDB.py @@ -13,21 +13,17 @@ # limitations under the License. import logging -from common.Settings import get_setting - from common.method_wrappers.Decorator import MetricsPool -from .KpiModel import Kpi as Model from common.tools.database.GenericDatabase import Database from common.method_wrappers.ServiceExceptions import OperationFailedException LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('KpiManager', 'Database') -DB_NAME = get_setting('CRDB_DATABASE', default=None) class KpiDB(Database): - def __init__(self) -> None: + def __init__(self, model) -> None: LOGGER.info('Init KpiManagerService') - super().__init__(DB_NAME, Model) + super().__init__(model) def select_with_filter(self, model, filter_object): """ @@ -38,33 +34,33 @@ class KpiDB(Database): """ session = self.Session() try: - query = session.query(Model) + query = session.query(model) # Apply filters based on the filter_object if filter_object.kpi_id: - query = query.filter(Model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) + query = query.filter(model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) if filter_object.kpi_sample_type: - query = query.filter(Model.kpi_sample_type.in_(filter_object.kpi_sample_type)) + query = query.filter(model.kpi_sample_type.in_(filter_object.kpi_sample_type)) if filter_object.device_id: - query = query.filter(Model.device_id.in_([d.device_uuid.uuid for d in filter_object.device_id])) + query = query.filter(model.device_id.in_([d.device_uuid.uuid for d in filter_object.device_id])) if filter_object.endpoint_id: - query = query.filter(Model.endpoint_id.in_([e.endpoint_uuid.uuid for e in filter_object.endpoint_id])) + query = query.filter(model.endpoint_id.in_([e.endpoint_uuid.uuid for e in filter_object.endpoint_id])) if filter_object.service_id: - query = query.filter(Model.service_id.in_([s.service_uuid.uuid for s in filter_object.service_id])) + query = query.filter(model.service_id.in_([s.service_uuid.uuid for s in filter_object.service_id])) if filter_object.slice_id: - query = query.filter(Model.slice_id.in_([s.slice_uuid.uuid for s in filter_object.slice_id])) + query = query.filter(model.slice_id.in_([s.slice_uuid.uuid for s in filter_object.slice_id])) if filter_object.connection_id: - query = query.filter(Model.connection_id.in_([c.connection_uuid.uuid for c in filter_object.connection_id])) + query = query.filter(model.connection_id.in_([c.connection_uuid.uuid for c in filter_object.connection_id])) if filter_object.link_id: - query = query.filter(Model.link_id.in_([l.link_uuid.uuid for l in filter_object.link_id])) + query = query.filter(model.link_id.in_([l.link_uuid.uuid for l in filter_object.link_id])) except Exception as e: LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) - return super().select_with_filter(query, session, Model) + return super().select_with_filter(query, session, model) diff --git a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py index 57acf3afe..3f9ae8492 100644 --- a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py +++ b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py @@ -28,7 +28,7 @@ METRICS_POOL = MetricsPool('KpiManager', 'NBIgRPC') class KpiManagerServiceServicerImpl(KpiManagerServiceServicer): def __init__(self): LOGGER.info('Init KpiManagerService') - self.kpi_db_obj = KpiDB() + self.kpi_db_obj = KpiDB(KpiModel) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetKpiDescriptor(self, request: KpiDescriptor, grpc_context: grpc.ServicerContext # type: ignore diff --git a/src/telemetry/backend/service/__main__.py b/src/telemetry/backend/service/__main__.py index 8af209e2c..9ec9e191f 100644 --- a/src/telemetry/backend/service/__main__.py +++ b/src/telemetry/backend/service/__main__.py @@ -37,7 +37,7 @@ def main(): LOGGER.info('Starting...') # Start metrics server - metrics_port = int(get_metrics_port()) + metrics_port = get_metrics_port() start_http_server(metrics_port) grpc_service = TelemetryBackendService() diff --git a/src/telemetry/database/Telemetry_DB.py b/src/telemetry/database/Telemetry_DB.py index 66cbfe77f..110c7e80a 100644 --- a/src/telemetry/database/Telemetry_DB.py +++ b/src/telemetry/database/Telemetry_DB.py @@ -13,20 +13,17 @@ # limitations under the License. import logging -from common.Settings import get_setting from common.method_wrappers.Decorator import MetricsPool -from .TelemetryModel import Collector as Model from common.tools.database.GenericDatabase import Database from common.method_wrappers.ServiceExceptions import OperationFailedException LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('TelemteryFrontend', 'Database') -DB_NAME = get_setting('CRDB_DATABASE', default=None) class TelemetryDB(Database): - def __init__(self) -> None: + def __init__(self, model) -> None: LOGGER.info('Init KpiManagerService') - super().__init__(DB_NAME, Model) + super().__init__(model) def select_with_filter(self, model, filter_object): """ @@ -37,11 +34,11 @@ class TelemetryDB(Database): """ session = self.Session() try: - query = session.query(Model) + query = session.query(model) if filter_object.kpi_id: - query = query.filter(Model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) + query = query.filter(model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) except Exception as e: LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) - return super().select_with_filter(query, session, Model) + return super().select_with_filter(query, session, model) diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index 43ac9455e..61446fcf8 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -26,7 +26,7 @@ from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceSer from telemetry.database.TelemetryModel import Collector as CollectorModel # from telemetry.database.Telemetry_DB import TelemetryDB -from ...database.Telemetry_DB import TelemetryDB +from telemetry.database.Telemetry_DB import TelemetryDB from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Producer as KafkaProducer @@ -41,7 +41,7 @@ ACTIVE_COLLECTORS = [] # keep and can be populated from DB class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): def __init__(self): LOGGER.info('Init TelemetryFrontendService') - self.tele_db_obj = TelemetryDB() + self.tele_db_obj = TelemetryDB(CollectorModel) self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'frontend', diff --git a/src/telemetry/frontend/service/__main__.py b/src/telemetry/frontend/service/__main__.py index 60b96646a..c18f7ba4c 100644 --- a/src/telemetry/frontend/service/__main__.py +++ b/src/telemetry/frontend/service/__main__.py @@ -47,7 +47,7 @@ def main(): kpiDBobj.create_tables() # Start metrics server - metrics_port = int(get_metrics_port()) + metrics_port = get_metrics_port() start_http_server(metrics_port) grpc_service = TelemetryFrontendService() -- GitLab From e30c0353f21c8bfae11d4b459f02f7263a9ca838 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Mon, 30 Sep 2024 16:56:10 +0000 Subject: [PATCH 10/13] Rrmoved all secrets from the tfs.sh. Generic secret for all sevices. --- deploy/tfs.sh | 44 ++----------------------------- manifests/analyticsservice.yaml | 2 +- manifests/kpi_managerservice.yaml | 2 +- manifests/telemetryservice.yaml | 2 +- 4 files changed, 5 insertions(+), 45 deletions(-) diff --git a/deploy/tfs.sh b/deploy/tfs.sh index 4268d50b2..65c1e8de2 100755 --- a/deploy/tfs.sh +++ b/deploy/tfs.sh @@ -147,56 +147,16 @@ sleep 2 printf "\n" echo ">>> Create Secret with CockroachDB data..." -echo "For Context" CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}') -# CRDB_DATABASE_CONTEXT="${CRDB_DATABASE}_context" # TODO: change by specific configurable environment variable kubectl create secret generic crdb-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ --from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \ --from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \ --from-literal=CRDB_USERNAME=${CRDB_USERNAME} \ --from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \ --from-literal=CRDB_SSLMODE=require -# printf "\n" - # --from-literal=CRDB_DATABASE=${CRDB_DATABASE_CONTEXT} \ - -echo "For KPI Management" -CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}') -# CRDB_DATABASE_KPI_MGMT="${CRDB_DATABASE}_kpi" -kubectl create secret generic crdb-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ - --from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \ - --from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \ - --from-literal=CRDB_USERNAME=${CRDB_USERNAME} \ - --from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \ - --from-literal=CRDB_SSLMODE=require -# printf "\n" - # --from-literal=CRDB_DATABASE=${CRDB_DATABASE_KPI_MGMT} \ - -echo "For Telemetry" -CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}') -# CRDB_DATABASE_TELEMETRY="${CRDB_DATABASE}_telemetry" -kubectl create secret generic crdb-telemetry --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ - --from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \ - --from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \ - --from-literal=CRDB_USERNAME=${CRDB_USERNAME} \ - --from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \ - --from-literal=CRDB_SSLMODE=require -# printf "\n" - # --from-literal=CRDB_DATABASE=${CRDB_DATABASE_TELEMETRY} \ - -echo "For Analytics" -CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}') -# CRDB_DATABASE_ANALYTICS="${CRDB_DATABASE}_analytics" -kubectl create secret generic crdb-analytics --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ - --from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \ - --from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \ - --from-literal=CRDB_USERNAME=${CRDB_USERNAME} \ - --from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \ - --from-literal=CRDB_SSLMODE=require -# printf "\n" - # --from-literal=CRDB_DATABASE=${CRDB_DATABASE_ANALYTICS} \ +printf "\n" -echo ">>> Create Secret with Apache Kakfa" -echo "For KPI, Telemetry and Analytics" +echo ">>> Create Secret with Apache Kakfa..." KFK_SERVER_PORT=$(kubectl --namespace ${KFK_NAMESPACE} get service kafka-service -o 'jsonpath={.spec.ports[0].port}') kubectl create secret generic kfk-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ --from-literal=KFK_NAMESPACE=${KFK_NAMESPACE} \ diff --git a/manifests/analyticsservice.yaml b/manifests/analyticsservice.yaml index e15214dbd..61666ead9 100644 --- a/manifests/analyticsservice.yaml +++ b/manifests/analyticsservice.yaml @@ -43,7 +43,7 @@ spec: value: "9192" envFrom: - secretRef: - name: crdb-analytics + name: crdb-data - secretRef: name: kfk-kpi-data readinessProbe: diff --git a/manifests/kpi_managerservice.yaml b/manifests/kpi_managerservice.yaml index efc3a720d..31eaf1284 100644 --- a/manifests/kpi_managerservice.yaml +++ b/manifests/kpi_managerservice.yaml @@ -43,7 +43,7 @@ spec: value: "tfs_kpi" envFrom: - secretRef: - name: crdb-kpi-data + name: crdb-data readinessProbe: exec: command: ["/bin/grpc_health_probe", "-addr=:30010"] diff --git a/manifests/telemetryservice.yaml b/manifests/telemetryservice.yaml index 9c979713d..c3763d6a9 100644 --- a/manifests/telemetryservice.yaml +++ b/manifests/telemetryservice.yaml @@ -43,7 +43,7 @@ spec: value: "9192" envFrom: - secretRef: - name: crdb-telemetry + name: crdb-data - secretRef: name: kfk-kpi-data readinessProbe: -- GitLab From c8d99dcbafccd112385d34467bf0a52e54fbbfef Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Mon, 30 Sep 2024 17:14:07 +0000 Subject: [PATCH 11/13] DB_NAME is moved to Generic_Engine class. --- src/analytics/database/Analyzer_DB.py | 2 +- src/analytics/frontend/service/__main__.py | 5 +---- src/analytics/frontend/tests/test_frontend.py | 2 +- src/common/tools/database/GenericDatabase.py | 5 ++--- src/common/tools/database/GenericEngine.py | 4 ++-- src/kpi_manager/service/__main__.py | 5 +---- src/telemetry/frontend/service/__main__.py | 5 +---- 7 files changed, 9 insertions(+), 19 deletions(-) diff --git a/src/analytics/database/Analyzer_DB.py b/src/analytics/database/Analyzer_DB.py index 99e9c62f6..ab0b50f2e 100644 --- a/src/analytics/database/Analyzer_DB.py +++ b/src/analytics/database/Analyzer_DB.py @@ -53,4 +53,4 @@ class AnalyzerDB(Database): LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) - return super().select_with_filter(query, session, model) \ No newline at end of file + return super().select_with_filter(query, session, model) diff --git a/src/analytics/frontend/service/__main__.py b/src/analytics/frontend/service/__main__.py index cc324e2dc..1df996785 100644 --- a/src/analytics/frontend/service/__main__.py +++ b/src/analytics/frontend/service/__main__.py @@ -18,10 +18,7 @@ from common.Settings import get_log_level, get_metrics_port from .AnalyticsFrontendService import AnalyticsFrontendService from analytics.database.AnalyzerModel import Analyzer as Model from common.tools.database.GenericDatabase import Database -from common.Settings import get_setting - -DB_NAME = get_setting('CRDB_DATABASE', default=None) terminate = threading.Event() LOGGER = None @@ -42,7 +39,7 @@ def main(): LOGGER.info('Starting...') # To create DB - kpiDBobj = Database(DB_NAME, Model) + kpiDBobj = Database(Model) kpiDBobj.create_database() kpiDBobj.create_tables() diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py index f7a25f4c7..44e84e468 100644 --- a/src/analytics/frontend/tests/test_frontend.py +++ b/src/analytics/frontend/tests/test_frontend.py @@ -123,4 +123,4 @@ def test_SelectAnalytics(analyticsFrontend_client): # class_obj = AnalyticsFrontendServiceServicerImpl() # for response in class_obj.StartResponseListener(analyzer_id.analyzer_id.uuid): # LOGGER.debug(response) -# assert isinstance(response, tuple) \ No newline at end of file +# assert isinstance(response, tuple) diff --git a/src/common/tools/database/GenericDatabase.py b/src/common/tools/database/GenericDatabase.py index 9868401ec..bc73581e3 100644 --- a/src/common/tools/database/GenericDatabase.py +++ b/src/common/tools/database/GenericDatabase.py @@ -23,11 +23,10 @@ from common.Settings import get_setting from common.method_wrappers.ServiceExceptions import (OperationFailedException, AlreadyExistsException) LOGGER = logging.getLogger(__name__) -DB_NAME = get_setting('CRDB_DATABASE', default=None) class Database: def __init__(self, model): - self.db_engine = Engine.get_engine(DB_NAME) + self.db_engine = Engine.get_engine() if self.db_engine is None: LOGGER.error('Unable to get SQLAlchemy DB Engine...') raise Exception('Failed to initialize the database engine.') @@ -48,7 +47,7 @@ class Database: def create_tables(self): try: self.db_model.metadata.create_all(self.db_engine) - LOGGER.debug("Tables created in the database: {:}".format(self.db_name)) + LOGGER.debug("Tables created in the database: {:}".format(self.db_table)) except Exception as e: LOGGER.debug("Tables cannot be created in the database. {:s}".format(str(e))) raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)]) diff --git a/src/common/tools/database/GenericEngine.py b/src/common/tools/database/GenericEngine.py index ff3def466..8c458285d 100644 --- a/src/common/tools/database/GenericEngine.py +++ b/src/common/tools/database/GenericEngine.py @@ -20,12 +20,12 @@ CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster class Engine: @staticmethod - def get_engine(db_name) -> sqlalchemy.engine.Engine: + def get_engine() -> sqlalchemy.engine.Engine: crdb_uri = get_setting('CRDB_URI', default=None) if crdb_uri is None: CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE') CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT') - CRDB_DATABASE = db_name + CRDB_DATABASE = get_setting('CRDB_DATABASE', default=None) CRDB_USERNAME = get_setting('CRDB_USERNAME') CRDB_PASSWORD = get_setting('CRDB_PASSWORD') CRDB_SSLMODE = get_setting('CRDB_SSLMODE') diff --git a/src/kpi_manager/service/__main__.py b/src/kpi_manager/service/__main__.py index 6a3d078e2..05e32bb58 100644 --- a/src/kpi_manager/service/__main__.py +++ b/src/kpi_manager/service/__main__.py @@ -18,10 +18,7 @@ from .KpiManagerService import KpiManagerService from kpi_manager.database.KpiModel import Kpi as Model from common.tools.database.GenericDatabase import Database -from common.Settings import get_setting - -DB_NAME = get_setting('CRDB_DATABASE', default=None) terminate = threading.Event() LOGGER = None @@ -42,7 +39,7 @@ def main(): LOGGER.debug('Starting...') # To create DB - kpiDBobj = Database(DB_NAME, Model) + kpiDBobj = Database(Model) kpiDBobj.create_database() kpiDBobj.create_tables() diff --git a/src/telemetry/frontend/service/__main__.py b/src/telemetry/frontend/service/__main__.py index c18f7ba4c..6697ff5f1 100644 --- a/src/telemetry/frontend/service/__main__.py +++ b/src/telemetry/frontend/service/__main__.py @@ -18,10 +18,7 @@ from common.Settings import get_log_level, get_metrics_port from .TelemetryFrontendService import TelemetryFrontendService from telemetry.database.TelemetryModel import Collector as Model from common.tools.database.GenericDatabase import Database -from common.Settings import get_setting - -DB_NAME = get_setting('CRDB_DATABASE', default=None) terminate = threading.Event() LOGGER = None @@ -42,7 +39,7 @@ def main(): LOGGER.info('Starting...') # To create DB - kpiDBobj = Database(DB_NAME, Model) + kpiDBobj = Database(Model) kpiDBobj.create_database() kpiDBobj.create_tables() -- GitLab From 81b23fd8a52f49e2d5ca7ea7896564cdaa5221e7 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Mon, 30 Sep 2024 17:18:52 +0000 Subject: [PATCH 12/13] Minor changes in Generic DB class. --- src/common/tools/database/GenericDatabase.py | 1 - src/common/tools/database/GenericEngine.py | 2 +- .../frontend/service/TelemetryFrontendServiceServicerImpl.py | 1 - src/telemetry/tests/test_telemetryDB.py | 3 ++- 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/common/tools/database/GenericDatabase.py b/src/common/tools/database/GenericDatabase.py index bc73581e3..0cd41b9ef 100644 --- a/src/common/tools/database/GenericDatabase.py +++ b/src/common/tools/database/GenericDatabase.py @@ -30,7 +30,6 @@ class Database: if self.db_engine is None: LOGGER.error('Unable to get SQLAlchemy DB Engine...') raise Exception('Failed to initialize the database engine.') - self.db_name = DB_NAME self.db_model = model self.db_table = model.__name__ self.Session = sessionmaker(bind=self.db_engine) diff --git a/src/common/tools/database/GenericEngine.py b/src/common/tools/database/GenericEngine.py index 8c458285d..18bb15360 100644 --- a/src/common/tools/database/GenericEngine.py +++ b/src/common/tools/database/GenericEngine.py @@ -25,7 +25,7 @@ class Engine: if crdb_uri is None: CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE') CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT') - CRDB_DATABASE = get_setting('CRDB_DATABASE', default=None) + CRDB_DATABASE = get_setting('CRDB_DATABASE') CRDB_USERNAME = get_setting('CRDB_USERNAME') CRDB_PASSWORD = get_setting('CRDB_PASSWORD') CRDB_SSLMODE = get_setting('CRDB_SSLMODE') diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index 61446fcf8..c72e66bdd 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -25,7 +25,6 @@ from common.proto.telemetry_frontend_pb2 import CollectorId, Collector, Collecto from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceServicer from telemetry.database.TelemetryModel import Collector as CollectorModel -# from telemetry.database.Telemetry_DB import TelemetryDB from telemetry.database.Telemetry_DB import TelemetryDB from confluent_kafka import Consumer as KafkaConsumer diff --git a/src/telemetry/tests/test_telemetryDB.py b/src/telemetry/tests/test_telemetryDB.py index 0d5bd0584..6fc02f5e1 100644 --- a/src/telemetry/tests/test_telemetryDB.py +++ b/src/telemetry/tests/test_telemetryDB.py @@ -25,4 +25,5 @@ def test_verify_databases_and_tables(): # TelemetryDBobj.verify_tables() TelemetryDBobj.create_database() TelemetryDBobj.create_tables() - TelemetryDBobj.verify_tables() \ No newline at end of file + TelemetryDBobj.verify_tables() + \ No newline at end of file -- GitLab From 14e6cbb7fc6da92e2cd8f2a39b3701d5acff2884 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Mon, 30 Sep 2024 17:20:47 +0000 Subject: [PATCH 13/13] added new line --- src/telemetry/tests/test_telemetryDB.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/telemetry/tests/test_telemetryDB.py b/src/telemetry/tests/test_telemetryDB.py index 6fc02f5e1..1b122e4bc 100644 --- a/src/telemetry/tests/test_telemetryDB.py +++ b/src/telemetry/tests/test_telemetryDB.py @@ -26,4 +26,3 @@ def test_verify_databases_and_tables(): TelemetryDBobj.create_database() TelemetryDBobj.create_tables() TelemetryDBobj.verify_tables() - \ No newline at end of file -- GitLab