diff --git a/deploy/tfs.sh b/deploy/tfs.sh index da078a4f3d9b4c441085a9ae0b8c532ca1f66032..4268d50b2504cc4a8767d3bb9ff4c8fcdae7d1af 100755 --- a/deploy/tfs.sh +++ b/deploy/tfs.sh @@ -146,55 +146,57 @@ kubectl create namespace $TFS_K8S_NAMESPACE sleep 2 printf "\n" -echo "Create secret with CockroachDB data" +echo ">>> Create Secret with CockroachDB data..." +echo "For Context" CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}') -CRDB_DATABASE_CONTEXT=${CRDB_DATABASE} # TODO: change by specific configurable environment variable +# CRDB_DATABASE_CONTEXT="${CRDB_DATABASE}_context" # TODO: change by specific configurable environment variable kubectl create secret generic crdb-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ --from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \ --from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \ - --from-literal=CRDB_DATABASE=${CRDB_DATABASE_CONTEXT} \ --from-literal=CRDB_USERNAME=${CRDB_USERNAME} \ --from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \ --from-literal=CRDB_SSLMODE=require -printf "\n" +# printf "\n" + # --from-literal=CRDB_DATABASE=${CRDB_DATABASE_CONTEXT} \ -echo "Create secret with CockroachDB data for KPI Management microservices" +echo "For KPI Management" CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}') -CRDB_DATABASE_KPI_MGMT="tfs_kpi_mgmt" # TODO: change by specific configurable environment variable +# CRDB_DATABASE_KPI_MGMT="${CRDB_DATABASE}_kpi" kubectl create secret generic crdb-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ --from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \ --from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \ - --from-literal=CRDB_DATABASE=${CRDB_DATABASE_KPI_MGMT} \ --from-literal=CRDB_USERNAME=${CRDB_USERNAME} \ --from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \ --from-literal=CRDB_SSLMODE=require -printf "\n" +# printf "\n" + # --from-literal=CRDB_DATABASE=${CRDB_DATABASE_KPI_MGMT} \ -echo "Create secret with CockroachDB data for Telemetry microservices" +echo "For Telemetry" CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}') -CRDB_DATABASE_TELEMETRY="tfs_telemetry" # TODO: change by specific configurable environment variable +# CRDB_DATABASE_TELEMETRY="${CRDB_DATABASE}_telemetry" kubectl create secret generic crdb-telemetry --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ --from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \ --from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \ - --from-literal=CRDB_DATABASE=${CRDB_DATABASE_TELEMETRY} \ --from-literal=CRDB_USERNAME=${CRDB_USERNAME} \ --from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \ --from-literal=CRDB_SSLMODE=require -printf "\n" +# printf "\n" + # --from-literal=CRDB_DATABASE=${CRDB_DATABASE_TELEMETRY} \ -echo "Create secret with CockroachDB data for Analytics microservices" +echo "For Analytics" CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}') -CRDB_DATABASE_ANALYTICS="tfs_analytics" # TODO: change by specific configurable environment variable +# CRDB_DATABASE_ANALYTICS="${CRDB_DATABASE}_analytics" kubectl create secret generic crdb-analytics --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ --from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \ --from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \ - --from-literal=CRDB_DATABASE=${CRDB_DATABASE_ANALYTICS} \ --from-literal=CRDB_USERNAME=${CRDB_USERNAME} \ --from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \ --from-literal=CRDB_SSLMODE=require -printf "\n" +# printf "\n" + # --from-literal=CRDB_DATABASE=${CRDB_DATABASE_ANALYTICS} \ -echo "Create secret with Apache Kafka data for KPI, Telemetry and Analytics microservices" +echo ">>> Create Secret with Apache Kakfa" +echo "For KPI, Telemetry and Analytics" KFK_SERVER_PORT=$(kubectl --namespace ${KFK_NAMESPACE} get service kafka-service -o 'jsonpath={.spec.ports[0].port}') kubectl create secret generic kfk-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ --from-literal=KFK_NAMESPACE=${KFK_NAMESPACE} \ diff --git a/manifests/analyticsservice.yaml b/manifests/analyticsservice.yaml index 0fa3ed0be6eda8cf944e199543e3c2cd59cc98d6..7340dff5facd3f77d6ea5ae88fedb21818183333 100644 --- a/manifests/analyticsservice.yaml +++ b/manifests/analyticsservice.yaml @@ -37,6 +37,8 @@ spec: env: - name: LOG_LEVEL value: "INFO" + - name: CRDB_DATABASE + value: "tfs_analytics" envFrom: - secretRef: name: crdb-analytics diff --git a/manifests/contextservice.yaml b/manifests/contextservice.yaml index 3abc4f208da8b4820b589b798a328c4a971f55f0..0fc8a1c44f7358a962276ebcf38a165d2db986cd 100644 --- a/manifests/contextservice.yaml +++ b/manifests/contextservice.yaml @@ -45,6 +45,8 @@ spec: value: "FALSE" - name: ALLOW_EXPLICIT_ADD_LINK_TO_TOPOLOGY value: "FALSE" + - name: CRDB_DATABASE + value: "tfs_context" envFrom: - secretRef: name: crdb-data diff --git a/manifests/kpi_managerservice.yaml b/manifests/kpi_managerservice.yaml index 984d783a9de7ed3c0c02e87d82ec673dc19c9508..efc3a720d5bd3f8f561f03fb679c2461dd0858e5 100644 --- a/manifests/kpi_managerservice.yaml +++ b/manifests/kpi_managerservice.yaml @@ -39,6 +39,8 @@ spec: env: - name: LOG_LEVEL value: "INFO" + - name: CRDB_DATABASE + value: "tfs_kpi" envFrom: - secretRef: name: crdb-kpi-data diff --git a/manifests/telemetryservice.yaml b/manifests/telemetryservice.yaml index 2f9917499a425b95d436ffa8cdb311d29483d2ca..7c781bb3d31ed929913966bb6a62b8f984355e25 100644 --- a/manifests/telemetryservice.yaml +++ b/manifests/telemetryservice.yaml @@ -37,6 +37,8 @@ spec: env: - name: LOG_LEVEL value: "INFO" + - name: CRDB_DATABASE + value: "tfs_kpi" envFrom: - secretRef: name: crdb-telemetry diff --git a/src/analytics/database/AnalyzerEngine.py b/src/analytics/database/AnalyzerEngine.py deleted file mode 100644 index 9294e09966ef9e13c9cfa3cab590e5d0c8b6a80e..0000000000000000000000000000000000000000 --- a/src/analytics/database/AnalyzerEngine.py +++ /dev/null @@ -1,40 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging, sqlalchemy -from common.Settings import get_setting - -LOGGER = logging.getLogger(__name__) -CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}' - -class AnalyzerEngine: - @staticmethod - def get_engine() -> sqlalchemy.engine.Engine: - crdb_uri = get_setting('CRDB_URI', default=None) - if crdb_uri is None: - CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE') - CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT') - CRDB_DATABASE = "tfs-analyzer" # TODO: define variable get_setting('CRDB_DATABASE_KPI_MGMT') - CRDB_USERNAME = get_setting('CRDB_USERNAME') - CRDB_PASSWORD = get_setting('CRDB_PASSWORD') - CRDB_SSLMODE = get_setting('CRDB_SSLMODE') - crdb_uri = CRDB_URI_TEMPLATE.format( - CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) - try: - engine = sqlalchemy.create_engine(crdb_uri, echo=False) - LOGGER.info(' AnalyzerDB initalized with DB URL: {:}'.format(crdb_uri)) - except: # pylint: disable=bare-except # pragma: no cover - LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) - return None # type: ignore - return engine diff --git a/src/analytics/database/Analyzer_DB.py b/src/analytics/database/Analyzer_DB.py index 1ba68989a066e4638adc12e65289ed50b740731d..8420b66b522aa0d41e0d1c84a31c8cc5a3cc2c81 100644 --- a/src/analytics/database/Analyzer_DB.py +++ b/src/analytics/database/Analyzer_DB.py @@ -13,138 +13,47 @@ # limitations under the License. import logging -import sqlalchemy_utils +from common.Settings import get_setting +from common.method_wrappers.Decorator import MetricsPool +from .AnalyzerModel import Analyzer as Model +from common.tools.database.GenericDatabase import Database +from common.method_wrappers.ServiceExceptions import OperationFailedException -from sqlalchemy import inspect, or_ -from sqlalchemy.orm import sessionmaker +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('KpiManager', 'Database') +DB_NAME = get_setting('CRDB_DATABASE', default=None) -from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel -from analytics.database.AnalyzerEngine import AnalyzerEngine -from common.method_wrappers.ServiceExceptions import (OperationFailedException, AlreadyExistsException) +class AnalyzerDB(Database): + def __init__(self) -> None: + LOGGER.info('Init KpiManagerService') + super().__init__(DB_NAME, Model) -LOGGER = logging.getLogger(__name__) -DB_NAME = "tfs_analyzer" # TODO: export name from enviornment variable - -class AnalyzerDB: - def __init__(self): - self.db_engine = AnalyzerEngine.get_engine() - if self.db_engine is None: - LOGGER.error('Unable to get SQLAlchemy DB Engine...') - return False - self.db_name = DB_NAME - self.Session = sessionmaker(bind=self.db_engine) - - def create_database(self): - if not sqlalchemy_utils.database_exists(self.db_engine.url): - LOGGER.debug("Database created. {:}".format(self.db_engine.url)) - sqlalchemy_utils.create_database(self.db_engine.url) - - def drop_database(self) -> None: - if sqlalchemy_utils.database_exists(self.db_engine.url): - sqlalchemy_utils.drop_database(self.db_engine.url) - - def create_tables(self): - try: - AnalyzerModel.metadata.create_all(self.db_engine) # type: ignore - LOGGER.debug("Tables created in the database: {:}".format(self.db_name)) - except Exception as e: - LOGGER.debug("Tables cannot be created in the database. {:s}".format(str(e))) - raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)]) - - def verify_tables(self): - try: - inspect_object = inspect(self.db_engine) - if(inspect_object.has_table('analyzer', None)): - LOGGER.info("Table exists in DB: {:}".format(self.db_name)) - except Exception as e: - LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) - -# ----------------- CURD OPERATIONS --------------------- - - def add_row_to_db(self, row): - session = self.Session() - try: - session.add(row) - session.commit() - LOGGER.debug(f"Row inserted into {row.__class__.__name__} table.") - return True - except Exception as e: - session.rollback() - if "psycopg2.errors.UniqueViolation" in str(e): - LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}") - raise AlreadyExistsException(row.__class__.__name__, row, - extra_details=["Unique key voilation: {:}".format(e)] ) - else: - LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}") - raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) - finally: - session.close() - - def search_db_row_by_id(self, model, col_name, id_to_search): - session = self.Session() - try: - entity = session.query(model).filter_by(**{col_name: id_to_search}).first() - if entity: - # LOGGER.debug(f"{model.__name__} ID found: {str(entity)}") - return entity - else: - LOGGER.debug(f"{model.__name__} ID not found, No matching row: {str(id_to_search)}") - print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search)) - return None - except Exception as e: - session.rollback() - LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}") - raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)]) - finally: - session.close() - - def delete_db_row_by_id(self, model, col_name, id_to_search): - session = self.Session() - try: - record = session.query(model).filter_by(**{col_name: id_to_search}).first() - if record: - session.delete(record) - session.commit() - LOGGER.debug("Deleted %s with %s: %s", model.__name__, col_name, id_to_search) - else: - LOGGER.debug("%s with %s %s not found", model.__name__, col_name, id_to_search) - return None - except Exception as e: - session.rollback() - LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e) - raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) - finally: - session.close() - def select_with_filter(self, model, filter_object): + """ + Generic method to create filters dynamically based on filter_object attributes. + params: model: SQLAlchemy model class to query. + filter_object: Object that contains filtering criteria as attributes. + return: SQLAlchemy session, query and Model + """ session = self.Session() try: - query = session.query(AnalyzerModel) - + query = session.query(Model) # Apply filters based on the filter_object if filter_object.analyzer_id: - query = query.filter(AnalyzerModel.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) + query = query.filter(Model.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) if filter_object.algorithm_names: - query = query.filter(AnalyzerModel.algorithm_name.in_(filter_object.algorithm_names)) + query = query.filter(Model.algorithm_name.in_(filter_object.algorithm_names)) if filter_object.input_kpi_ids: input_kpi_uuids = [k.kpi_id.uuid for k in filter_object.input_kpi_ids] - query = query.filter(AnalyzerModel.input_kpi_ids.op('&&')(input_kpi_uuids)) + query = query.filter(Model.input_kpi_ids.op('&&')(input_kpi_uuids)) if filter_object.output_kpi_ids: output_kpi_uuids = [k.kpi_id.uuid for k in filter_object.output_kpi_ids] - query = query.filter(AnalyzerModel.output_kpi_ids.op('&&')(output_kpi_uuids)) - - result = query.all() - # query should be added to return all rows - if result: - LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result} - else: - LOGGER.warning(f"No matching row found in {model.__name__} table with filters: {filter_object}") - return result + query = query.filter(Model.output_kpi_ids.op('&&')(output_kpi_uuids)) except Exception as e: - LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}") - raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)]) - finally: - session.close() + LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") + raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) + + return super().select_with_filter(query, session, Model) \ No newline at end of file diff --git a/src/analytics/database/new_Analyzer_DB.py b/src/analytics/database/new_Analyzer_DB.py deleted file mode 100644 index 3b3a746410b65393ddf455847d4208e465e6914f..0000000000000000000000000000000000000000 --- a/src/analytics/database/new_Analyzer_DB.py +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging - -from common.method_wrappers.Decorator import MetricsPool -from .AnalyzerModel import Analyzer as Model -from common.tools.database.GenericDatabase import Database -from common.method_wrappers.ServiceExceptions import OperationFailedException - -LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('KpiManager', 'Database') - -class AnalyzerDB(Database): - def __init__(self) -> None: - LOGGER.info('Init KpiManagerService') - super().__init__("tfs_kpi_mgmt", Model) - - def add_row_to_db(self, row): - return super().add_row_to_db(row) - - def search_db_row_by_id(self, model, col_name, id_to_search): - return super().search_db_row_by_id(model, col_name, id_to_search) - - def delete_db_row_by_id(self, model, col_name, id_to_search): - return super().delete_db_row_by_id(model, col_name, id_to_search) - - def select_with_filter(self, model, filter_object): - """ - Generic method to create filters dynamically based on filter_object attributes. - params: model: SQLAlchemy model class to query. - filter_object: Object that contains filtering criteria as attributes. - return: SQLAlchemy session, query and Model - """ - session = self.Session() - try: - query = session.query(Model) - # Apply filters based on the filter_object - if filter_object.analyzer_id: - query = query.filter(Model.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) - - if filter_object.algorithm_names: - query = query.filter(Model.algorithm_name.in_(filter_object.algorithm_names)) - - if filter_object.input_kpi_ids: - input_kpi_uuids = [k.kpi_id.uuid for k in filter_object.input_kpi_ids] - query = query.filter(Model.input_kpi_ids.op('&&')(input_kpi_uuids)) - - if filter_object.output_kpi_ids: - output_kpi_uuids = [k.kpi_id.uuid for k in filter_object.output_kpi_ids] - query = query.filter(Model.output_kpi_ids.op('&&')(output_kpi_uuids)) - except Exception as e: - LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") - raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) - - return super().select_with_filter(query, session, Model) \ No newline at end of file diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index 5ea0e2ef60227ecf05ae89b57f496ac86d3631b5..ab274ba36bf945488769d5ffde297ac10349b320 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -25,7 +25,7 @@ from common.proto.context_pb2 import Empty from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer -from analytics.database. new_Analyzer_DB import AnalyzerDB +from analytics.database.Analyzer_DB import AnalyzerDB from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger diff --git a/src/kpi_manager/database/new_KpiDB.py b/src/kpi_manager/database/KpiDB.py similarity index 87% rename from src/kpi_manager/database/new_KpiDB.py rename to src/kpi_manager/database/KpiDB.py index 8aa1cd20bacdc39d6efe8f33a3c1475ead53c652..cd1b20e1954d85a68d375979d0d9441837f7a252 100644 --- a/src/kpi_manager/database/new_KpiDB.py +++ b/src/kpi_manager/database/KpiDB.py @@ -13,28 +13,21 @@ # limitations under the License. import logging +from common.Settings import get_setting from common.method_wrappers.Decorator import MetricsPool from .KpiModel import Kpi as Model from common.tools.database.GenericDatabase import Database from common.method_wrappers.ServiceExceptions import OperationFailedException -LOGGER = logging.getLogger(__name__) +LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('KpiManager', 'Database') +DB_NAME = get_setting('CRDB_DATABASE', default=None) class KpiDB(Database): def __init__(self) -> None: LOGGER.info('Init KpiManagerService') - super().__init__("tfs_kpi_mgmt", Model) - - def add_row_to_db(self, row): - return super().add_row_to_db(row) - - def search_db_row_by_id(self, model, col_name, id_to_search): - return super().search_db_row_by_id(model, col_name, id_to_search) - - def delete_db_row_by_id(self, model, col_name, id_to_search): - return super().delete_db_row_by_id(model, col_name, id_to_search) + super().__init__(DB_NAME, Model) def select_with_filter(self, model, filter_object): """ diff --git a/src/kpi_manager/database/KpiEngine.py b/src/kpi_manager/database/KpiEngine.py deleted file mode 100644 index 0fce7e3d36cf2f03a18f311c815719a4f17b2869..0000000000000000000000000000000000000000 --- a/src/kpi_manager/database/KpiEngine.py +++ /dev/null @@ -1,40 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging, sqlalchemy -from common.Settings import get_setting - -LOGGER = logging.getLogger(__name__) -CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}' - -class KpiEngine: - @staticmethod - def get_engine() -> sqlalchemy.engine.Engine: - crdb_uri = get_setting('CRDB_URI', default=None) - if crdb_uri is None: - CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE') - CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT') - CRDB_DATABASE = 'tfs_kpi_mgmt' # TODO: define variable get_setting('CRDB_DATABASE_KPI_MGMT') - CRDB_USERNAME = get_setting('CRDB_USERNAME') - CRDB_PASSWORD = get_setting('CRDB_PASSWORD') - CRDB_SSLMODE = get_setting('CRDB_SSLMODE') - crdb_uri = CRDB_URI_TEMPLATE.format( - CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) - try: - engine = sqlalchemy.create_engine(crdb_uri, echo=False) - LOGGER.info(' KpiDBmanager initalized with DB URL: {:}'.format(crdb_uri)) - except: # pylint: disable=bare-except # pragma: no cover - LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) - return None # type: ignore - return engine diff --git a/src/kpi_manager/database/Kpi_DB.py b/src/kpi_manager/database/Kpi_DB.py deleted file mode 100644 index 49ad9c9b579daa918818366a1d9505089968edc2..0000000000000000000000000000000000000000 --- a/src/kpi_manager/database/Kpi_DB.py +++ /dev/null @@ -1,154 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import sqlalchemy_utils -from sqlalchemy.orm import sessionmaker -from kpi_manager.database.KpiEngine import KpiEngine -from kpi_manager.database.KpiModel import Kpi as KpiModel -from common.method_wrappers.ServiceExceptions import ( - AlreadyExistsException, OperationFailedException , NotFoundException) - -LOGGER = logging.getLogger(__name__) -DB_NAME = "tfs_kpi_mgmt" - -class KpiDB: - def __init__(self): - self.db_engine = KpiEngine.get_engine() - if self.db_engine is None: - LOGGER.error('Unable to get SQLAlchemy DB Engine...') - return False - self.db_name = DB_NAME - self.Session = sessionmaker(bind=self.db_engine) - - def create_database(self) -> None: - if not sqlalchemy_utils.database_exists(self.db_engine.url): - sqlalchemy_utils.create_database(self.db_engine.url) - LOGGER.debug("Database created. {:}".format(self.db_engine.url)) - - def drop_database(self) -> None: - if sqlalchemy_utils.database_exists(self.db_engine.url): - sqlalchemy_utils.drop_database(self.db_engine.url) - - def create_tables(self): - # TODO: use "get_tables(declatrative class obj)" method of "sqlalchemy_utils" to verify tables. - try: - KpiModel.metadata.create_all(self.db_engine) # type: ignore - LOGGER.debug("Tables created in the DB Name: {:}".format(self.db_name)) - except Exception as e: - LOGGER.debug("Tables cannot be created in the kpi database. {:s}".format(str(e))) - raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)]) - - def verify_tables(self): - try: - with self.db_engine.connect() as connection: - result = connection.execute("SHOW TABLES;") - tables = result.fetchall() # type: ignore - LOGGER.debug("Tables verified: {:}".format(tables)) - except Exception as e: - LOGGER.debug("Unable to fetch Table names. {:s}".format(str(e))) - - def add_row_to_db(self, row): - session = self.Session() - try: - session.add(row) - session.commit() - LOGGER.debug(f"Row inserted into {row.__class__.__name__} table.") - return True - except Exception as e: - session.rollback() - if "psycopg2.errors.UniqueViolation" in str(e): - LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}") - raise AlreadyExistsException(row.__class__.__name__, row, extra_details=["Unique key voilation: {:}".format(e)] ) - else: - LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}") - raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) - finally: - session.close() - - def search_db_row_by_id(self, model, col_name, id_to_search): - session = self.Session() - try: - entity = session.query(model).filter_by(**{col_name: id_to_search}).first() - if entity: - # LOGGER.debug(f"{model.__name__} ID found: {str(entity)}") - return entity - else: - LOGGER.debug(f"{model.__name__} ID not found, No matching row: {str(id_to_search)}") - print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search)) - return None - except Exception as e: - LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}") - raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)]) - finally: - session.close() - - def delete_db_row_by_id(self, model, col_name, id_to_search): - session = self.Session() - try: - record = session.query(model).filter_by(**{col_name: id_to_search}).first() - if record: - session.delete(record) - session.commit() - LOGGER.debug("Deleted %s with %s: %s", model.__name__, col_name, id_to_search) - else: - LOGGER.debug("%s with %s %s not found", model.__name__, col_name, id_to_search) - return None - except Exception as e: - session.rollback() - LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e) - raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) - finally: - session.close() - - def select_with_filter(self, model, filter_object): - session = self.Session() - try: - query = session.query(KpiModel) - # Apply filters based on the filter_object - if filter_object.kpi_id: - query = query.filter(KpiModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) - - if filter_object.kpi_sample_type: - query = query.filter(KpiModel.kpi_sample_type.in_(filter_object.kpi_sample_type)) - - if filter_object.device_id: - query = query.filter(KpiModel.device_id.in_([d.device_uuid.uuid for d in filter_object.device_id])) - - if filter_object.endpoint_id: - query = query.filter(KpiModel.endpoint_id.in_([e.endpoint_uuid.uuid for e in filter_object.endpoint_id])) - - if filter_object.service_id: - query = query.filter(KpiModel.service_id.in_([s.service_uuid.uuid for s in filter_object.service_id])) - - if filter_object.slice_id: - query = query.filter(KpiModel.slice_id.in_([s.slice_uuid.uuid for s in filter_object.slice_id])) - - if filter_object.connection_id: - query = query.filter(KpiModel.connection_id.in_([c.connection_uuid.uuid for c in filter_object.connection_id])) - - if filter_object.link_id: - query = query.filter(KpiModel.link_id.in_([l.link_uuid.uuid for l in filter_object.link_id])) - result = query.all() - - if result: - LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result} - else: - LOGGER.debug(f"No matching row found in {model.__name__} table with filters: {filter_object}") - return result - except Exception as e: - LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}") - raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)]) - finally: - session.close() diff --git a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py index 39bc9e0b05285e76173d22b288a0cf34d8337da2..57acf3afe5bfaa4615d74f6865f3231ae0d5ef6f 100644 --- a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py +++ b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py @@ -19,7 +19,7 @@ from common.proto.context_pb2 import Empty from common.proto.kpi_manager_pb2_grpc import KpiManagerServiceServicer from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList # from kpi_manager.database.Kpi_DB import KpiDB -from kpi_manager.database.new_KpiDB import KpiDB +from kpi_manager.database.KpiDB import KpiDB from kpi_manager.database.KpiModel import Kpi as KpiModel LOGGER = logging.getLogger(__name__) diff --git a/src/telemetry/database/TelemetryEngine.py b/src/telemetry/database/TelemetryEngine.py deleted file mode 100644 index 7c8620faf25e695e7f971bce78be9ad208a7701b..0000000000000000000000000000000000000000 --- a/src/telemetry/database/TelemetryEngine.py +++ /dev/null @@ -1,40 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging, sqlalchemy -from common.Settings import get_setting - -LOGGER = logging.getLogger(__name__) -CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}' - -class TelemetryEngine: - @staticmethod - def get_engine() -> sqlalchemy.engine.Engine: - crdb_uri = get_setting('CRDB_URI', default=None) - if crdb_uri is None: - CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE') - CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT') - CRDB_DATABASE = "tfs-telemetry" # TODO: define variable get_setting('CRDB_DATABASE_KPI_MGMT') - CRDB_USERNAME = get_setting('CRDB_USERNAME') - CRDB_PASSWORD = get_setting('CRDB_PASSWORD') - CRDB_SSLMODE = get_setting('CRDB_SSLMODE') - crdb_uri = CRDB_URI_TEMPLATE.format( - CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) - try: - engine = sqlalchemy.create_engine(crdb_uri, echo=False) - LOGGER.info(' TelemetryDB initalized with DB URL: {:}'.format(crdb_uri)) - except: # pylint: disable=bare-except # pragma: no cover - LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) - return None # type: ignore - return engine diff --git a/src/telemetry/database/Telemetry_DB.py b/src/telemetry/database/Telemetry_DB.py index 32acfd73a410a7bfddd6b487d0b1962afadb3842..66cbfe77ff879afd04eb5c5449a8f1dee8d07d9a 100644 --- a/src/telemetry/database/Telemetry_DB.py +++ b/src/telemetry/database/Telemetry_DB.py @@ -13,125 +13,35 @@ # limitations under the License. import logging -import sqlalchemy_utils -from sqlalchemy import inspect -from sqlalchemy.orm import sessionmaker -from telemetry.database.TelemetryModel import Collector as CollectorModel -from telemetry.database.TelemetryEngine import TelemetryEngine -from common.method_wrappers.ServiceExceptions import ( - OperationFailedException, AlreadyExistsException ) - -LOGGER = logging.getLogger(__name__) -DB_NAME = "tfs_telemetry" - -class TelemetryDB: - def __init__(self): - self.db_engine = TelemetryEngine.get_engine() - if self.db_engine is None: - LOGGER.error('Unable to get SQLAlchemy DB Engine...') - return False - self.db_name = DB_NAME - self.Session = sessionmaker(bind=self.db_engine) - - def create_database(self): - if not sqlalchemy_utils.database_exists(self.db_engine.url): - LOGGER.debug("Database created. {:}".format(self.db_engine.url)) - sqlalchemy_utils.create_database(self.db_engine.url) - - def drop_database(self) -> None: - if sqlalchemy_utils.database_exists(self.db_engine.url): - sqlalchemy_utils.drop_database(self.db_engine.url) - - def create_tables(self): - try: - CollectorModel.metadata.create_all(self.db_engine) # type: ignore - LOGGER.debug("Tables created in the database: {:}".format(self.db_name)) - except Exception as e: - LOGGER.debug("Tables cannot be created in the database. {:s}".format(str(e))) - raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)]) - - def verify_tables(self): - try: - inspect_object = inspect(self.db_engine) - if(inspect_object.has_table('collector', None)): - LOGGER.info("Table exists in DB: {:}".format(self.db_name)) - except Exception as e: - LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) - -# ----------------- CURD METHODs --------------------- - - def add_row_to_db(self, row): - session = self.Session() - try: - session.add(row) - session.commit() - LOGGER.debug(f"Row inserted into {row.__class__.__name__} table.") - return True - except Exception as e: - session.rollback() - if "psycopg2.errors.UniqueViolation" in str(e): - LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}") - raise AlreadyExistsException(row.__class__.__name__, row, - extra_details=["Unique key voilation: {:}".format(e)] ) - else: - LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}") - raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) - finally: - session.close() - - def search_db_row_by_id(self, model, col_name, id_to_search): - session = self.Session() - try: - entity = session.query(model).filter_by(**{col_name: id_to_search}).first() - if entity: - # LOGGER.debug(f"{model.__name__} ID found: {str(entity)}") - return entity - else: - LOGGER.debug(f"{model.__name__} ID not found, No matching row: {str(id_to_search)}") - print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search)) - return None - except Exception as e: - session.rollback() - LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}") - raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)]) - finally: - session.close() - - def delete_db_row_by_id(self, model, col_name, id_to_search): - session = self.Session() - try: - record = session.query(model).filter_by(**{col_name: id_to_search}).first() - if record: - session.delete(record) - session.commit() - LOGGER.debug("Deleted %s with %s: %s", model.__name__, col_name, id_to_search) - else: - LOGGER.debug("%s with %s %s not found", model.__name__, col_name, id_to_search) - return None - except Exception as e: - session.rollback() - LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e) - raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) - finally: - session.close() +from common.Settings import get_setting +from common.method_wrappers.Decorator import MetricsPool +from .TelemetryModel import Collector as Model +from common.tools.database.GenericDatabase import Database +from common.method_wrappers.ServiceExceptions import OperationFailedException + +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('TelemteryFrontend', 'Database') +DB_NAME = get_setting('CRDB_DATABASE', default=None) + +class TelemetryDB(Database): + def __init__(self) -> None: + LOGGER.info('Init KpiManagerService') + super().__init__(DB_NAME, Model) def select_with_filter(self, model, filter_object): + """ + Generic method to create filters dynamically based on filter_object attributes. + params: model: SQLAlchemy model class to query. + filter_object: Object that contains filtering criteria as attributes. + return: SQLAlchemy session, query and Model + """ session = self.Session() try: - query = session.query(CollectorModel) - # Apply filters based on the filter_object + query = session.query(Model) if filter_object.kpi_id: - query = query.filter(CollectorModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) - result = query.all() - # query should be added to return all rows - if result: - LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result} - else: - LOGGER.warning(f"No matching row found in {model.__name__} table with filters: {filter_object}") - return result + query = query.filter(Model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) except Exception as e: - LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}") - raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)]) - finally: - session.close() - + LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") + raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) + + return super().select_with_filter(query, session, Model) diff --git a/src/telemetry/database/new_Telemetry_DB.py b/src/telemetry/database/new_Telemetry_DB.py deleted file mode 100644 index bd9d863d29c602885a503e2f065bae9790d92428..0000000000000000000000000000000000000000 --- a/src/telemetry/database/new_Telemetry_DB.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging - -from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method -from .TelemetryModel import Collector as Model -from common.tools.database.GenericDatabase import Database -from common.method_wrappers.ServiceExceptions import OperationFailedException - -LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('TelemteryFrontend', 'Database') - -class TelemetryDB(Database): - def __init__(self) -> None: - LOGGER.info('Init KpiManagerService') - super().__init__("tfs_telemetry", Model) - - def add_row_to_db(self, row): - return super().add_row_to_db(row) - - def search_db_row_by_id(self, model, col_name, id_to_search): - return super().search_db_row_by_id(model, col_name, id_to_search) - - def delete_db_row_by_id(self, model, col_name, id_to_search): - return super().delete_db_row_by_id(model, col_name, id_to_search) - - def select_with_filter(self, model, filter_object): - """ - Generic method to create filters dynamically based on filter_object attributes. - params: model: SQLAlchemy model class to query. - filter_object: Object that contains filtering criteria as attributes. - return: SQLAlchemy session, query and Model - """ - session = self.Session() - try: - query = session.query(Model) - # Apply filters based on the filter_object - if filter_object.kpi_id: - query = query.filter(Model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) - # query should be added to return all rows - except Exception as e: - LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") - raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) - - return super().select_with_filter(query, session, Model) diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index 9068de0db9d30fd2add0594effaa82fee189f7b6..43ac9455ebf291ab51dff3eb3d33d5072fce1ffa 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -26,7 +26,7 @@ from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceSer from telemetry.database.TelemetryModel import Collector as CollectorModel # from telemetry.database.Telemetry_DB import TelemetryDB -from ...database.new_Telemetry_DB import TelemetryDB +from ...database.Telemetry_DB import TelemetryDB from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Producer as KafkaProducer