From bdfc066bbe56c789240aa0ec3adbb40088b1b847 Mon Sep 17 00:00:00 2001 From: Waleed Akbar <wakbar@cttc.es> Date: Mon, 30 Sep 2024 16:47:18 +0000 Subject: [PATCH] Moved the Model object from the DB class to the implementation class. --- src/analytics/backend/service/__main__.py | 2 +- src/analytics/database/Analyzer_DB.py | 19 ++++++------- .../AnalyticsFrontendServiceServicerImpl.py | 2 +- src/analytics/frontend/service/__main__.py | 2 +- src/common/tools/database/GenericDatabase.py | 8 ++++-- src/kpi_manager/database/KpiDB.py | 28 ++++++++----------- .../service/KpiManagerServiceServicerImpl.py | 2 +- src/telemetry/backend/service/__main__.py | 2 +- src/telemetry/database/Telemetry_DB.py | 13 ++++----- .../TelemetryFrontendServiceServicerImpl.py | 4 +-- src/telemetry/frontend/service/__main__.py | 2 +- 11 files changed, 38 insertions(+), 46 deletions(-) diff --git a/src/analytics/backend/service/__main__.py b/src/analytics/backend/service/__main__.py index 9b1941e20..3c4c36b7c 100644 --- a/src/analytics/backend/service/__main__.py +++ b/src/analytics/backend/service/__main__.py @@ -37,7 +37,7 @@ def main(): LOGGER.info('Starting...') # Start metrics server - metrics_port = int(get_metrics_port()) + metrics_port = get_metrics_port() start_http_server(metrics_port) grpc_service = AnalyticsBackendService() diff --git a/src/analytics/database/Analyzer_DB.py b/src/analytics/database/Analyzer_DB.py index 8420b66b5..99e9c62f6 100644 --- a/src/analytics/database/Analyzer_DB.py +++ b/src/analytics/database/Analyzer_DB.py @@ -13,20 +13,17 @@ # limitations under the License. import logging -from common.Settings import get_setting from common.method_wrappers.Decorator import MetricsPool -from .AnalyzerModel import Analyzer as Model from common.tools.database.GenericDatabase import Database from common.method_wrappers.ServiceExceptions import OperationFailedException LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('KpiManager', 'Database') -DB_NAME = get_setting('CRDB_DATABASE', default=None) class AnalyzerDB(Database): - def __init__(self) -> None: + def __init__(self, model) -> None: LOGGER.info('Init KpiManagerService') - super().__init__(DB_NAME, Model) + super().__init__(model) def select_with_filter(self, model, filter_object): """ @@ -37,23 +34,23 @@ class AnalyzerDB(Database): """ session = self.Session() try: - query = session.query(Model) + query = session.query(model) # Apply filters based on the filter_object if filter_object.analyzer_id: - query = query.filter(Model.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) + query = query.filter(model.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) if filter_object.algorithm_names: - query = query.filter(Model.algorithm_name.in_(filter_object.algorithm_names)) + query = query.filter(model.algorithm_name.in_(filter_object.algorithm_names)) if filter_object.input_kpi_ids: input_kpi_uuids = [k.kpi_id.uuid for k in filter_object.input_kpi_ids] - query = query.filter(Model.input_kpi_ids.op('&&')(input_kpi_uuids)) + query = query.filter(model.input_kpi_ids.op('&&')(input_kpi_uuids)) if filter_object.output_kpi_ids: output_kpi_uuids = [k.kpi_id.uuid for k in filter_object.output_kpi_ids] - query = query.filter(Model.output_kpi_ids.op('&&')(output_kpi_uuids)) + query = query.filter(model.output_kpi_ids.op('&&')(output_kpi_uuids)) except Exception as e: LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) - return super().select_with_filter(query, session, Model) \ No newline at end of file + return super().select_with_filter(query, session, model) \ No newline at end of file diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index ab274ba36..a7fc8d492 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -37,7 +37,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): def __init__(self): LOGGER.info('Init AnalyticsFrontendService') self.listener_topic = KafkaTopic.ANALYTICS_RESPONSE.value - self.db_obj = AnalyzerDB() + self.db_obj = AnalyzerDB(AnalyzerModel) self.result_queue = queue.Queue() self.scheduler = BackgroundScheduler() self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) diff --git a/src/analytics/frontend/service/__main__.py b/src/analytics/frontend/service/__main__.py index bf94eb5e6..cc324e2dc 100644 --- a/src/analytics/frontend/service/__main__.py +++ b/src/analytics/frontend/service/__main__.py @@ -47,7 +47,7 @@ def main(): kpiDBobj.create_tables() # Start metrics server - metrics_port = int(get_metrics_port()) + metrics_port = get_metrics_port() start_http_server(metrics_port) grpc_service = AnalyticsFrontendService() diff --git a/src/common/tools/database/GenericDatabase.py b/src/common/tools/database/GenericDatabase.py index 7c6453d7c..9868401ec 100644 --- a/src/common/tools/database/GenericDatabase.py +++ b/src/common/tools/database/GenericDatabase.py @@ -18,18 +18,20 @@ import sqlalchemy_utils from .GenericEngine import Engine from sqlalchemy import inspect from sqlalchemy.orm import sessionmaker +from common.Settings import get_setting from common.method_wrappers.ServiceExceptions import (OperationFailedException, AlreadyExistsException) LOGGER = logging.getLogger(__name__) +DB_NAME = get_setting('CRDB_DATABASE', default=None) class Database: - def __init__(self, db_name, model): - self.db_engine = Engine.get_engine(db_name) + def __init__(self, model): + self.db_engine = Engine.get_engine(DB_NAME) if self.db_engine is None: LOGGER.error('Unable to get SQLAlchemy DB Engine...') raise Exception('Failed to initialize the database engine.') - self.db_name = db_name + self.db_name = DB_NAME self.db_model = model self.db_table = model.__name__ self.Session = sessionmaker(bind=self.db_engine) diff --git a/src/kpi_manager/database/KpiDB.py b/src/kpi_manager/database/KpiDB.py index cd1b20e19..d503f06f4 100644 --- a/src/kpi_manager/database/KpiDB.py +++ b/src/kpi_manager/database/KpiDB.py @@ -13,21 +13,17 @@ # limitations under the License. import logging -from common.Settings import get_setting - from common.method_wrappers.Decorator import MetricsPool -from .KpiModel import Kpi as Model from common.tools.database.GenericDatabase import Database from common.method_wrappers.ServiceExceptions import OperationFailedException LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('KpiManager', 'Database') -DB_NAME = get_setting('CRDB_DATABASE', default=None) class KpiDB(Database): - def __init__(self) -> None: + def __init__(self, model) -> None: LOGGER.info('Init KpiManagerService') - super().__init__(DB_NAME, Model) + super().__init__(model) def select_with_filter(self, model, filter_object): """ @@ -38,33 +34,33 @@ class KpiDB(Database): """ session = self.Session() try: - query = session.query(Model) + query = session.query(model) # Apply filters based on the filter_object if filter_object.kpi_id: - query = query.filter(Model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) + query = query.filter(model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) if filter_object.kpi_sample_type: - query = query.filter(Model.kpi_sample_type.in_(filter_object.kpi_sample_type)) + query = query.filter(model.kpi_sample_type.in_(filter_object.kpi_sample_type)) if filter_object.device_id: - query = query.filter(Model.device_id.in_([d.device_uuid.uuid for d in filter_object.device_id])) + query = query.filter(model.device_id.in_([d.device_uuid.uuid for d in filter_object.device_id])) if filter_object.endpoint_id: - query = query.filter(Model.endpoint_id.in_([e.endpoint_uuid.uuid for e in filter_object.endpoint_id])) + query = query.filter(model.endpoint_id.in_([e.endpoint_uuid.uuid for e in filter_object.endpoint_id])) if filter_object.service_id: - query = query.filter(Model.service_id.in_([s.service_uuid.uuid for s in filter_object.service_id])) + query = query.filter(model.service_id.in_([s.service_uuid.uuid for s in filter_object.service_id])) if filter_object.slice_id: - query = query.filter(Model.slice_id.in_([s.slice_uuid.uuid for s in filter_object.slice_id])) + query = query.filter(model.slice_id.in_([s.slice_uuid.uuid for s in filter_object.slice_id])) if filter_object.connection_id: - query = query.filter(Model.connection_id.in_([c.connection_uuid.uuid for c in filter_object.connection_id])) + query = query.filter(model.connection_id.in_([c.connection_uuid.uuid for c in filter_object.connection_id])) if filter_object.link_id: - query = query.filter(Model.link_id.in_([l.link_uuid.uuid for l in filter_object.link_id])) + query = query.filter(model.link_id.in_([l.link_uuid.uuid for l in filter_object.link_id])) except Exception as e: LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) - return super().select_with_filter(query, session, Model) + return super().select_with_filter(query, session, model) diff --git a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py index 57acf3afe..3f9ae8492 100644 --- a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py +++ b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py @@ -28,7 +28,7 @@ METRICS_POOL = MetricsPool('KpiManager', 'NBIgRPC') class KpiManagerServiceServicerImpl(KpiManagerServiceServicer): def __init__(self): LOGGER.info('Init KpiManagerService') - self.kpi_db_obj = KpiDB() + self.kpi_db_obj = KpiDB(KpiModel) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetKpiDescriptor(self, request: KpiDescriptor, grpc_context: grpc.ServicerContext # type: ignore diff --git a/src/telemetry/backend/service/__main__.py b/src/telemetry/backend/service/__main__.py index 8af209e2c..9ec9e191f 100644 --- a/src/telemetry/backend/service/__main__.py +++ b/src/telemetry/backend/service/__main__.py @@ -37,7 +37,7 @@ def main(): LOGGER.info('Starting...') # Start metrics server - metrics_port = int(get_metrics_port()) + metrics_port = get_metrics_port() start_http_server(metrics_port) grpc_service = TelemetryBackendService() diff --git a/src/telemetry/database/Telemetry_DB.py b/src/telemetry/database/Telemetry_DB.py index 66cbfe77f..110c7e80a 100644 --- a/src/telemetry/database/Telemetry_DB.py +++ b/src/telemetry/database/Telemetry_DB.py @@ -13,20 +13,17 @@ # limitations under the License. import logging -from common.Settings import get_setting from common.method_wrappers.Decorator import MetricsPool -from .TelemetryModel import Collector as Model from common.tools.database.GenericDatabase import Database from common.method_wrappers.ServiceExceptions import OperationFailedException LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('TelemteryFrontend', 'Database') -DB_NAME = get_setting('CRDB_DATABASE', default=None) class TelemetryDB(Database): - def __init__(self) -> None: + def __init__(self, model) -> None: LOGGER.info('Init KpiManagerService') - super().__init__(DB_NAME, Model) + super().__init__(model) def select_with_filter(self, model, filter_object): """ @@ -37,11 +34,11 @@ class TelemetryDB(Database): """ session = self.Session() try: - query = session.query(Model) + query = session.query(model) if filter_object.kpi_id: - query = query.filter(Model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) + query = query.filter(model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) except Exception as e: LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) - return super().select_with_filter(query, session, Model) + return super().select_with_filter(query, session, model) diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index 43ac9455e..61446fcf8 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -26,7 +26,7 @@ from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceSer from telemetry.database.TelemetryModel import Collector as CollectorModel # from telemetry.database.Telemetry_DB import TelemetryDB -from ...database.Telemetry_DB import TelemetryDB +from telemetry.database.Telemetry_DB import TelemetryDB from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Producer as KafkaProducer @@ -41,7 +41,7 @@ ACTIVE_COLLECTORS = [] # keep and can be populated from DB class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): def __init__(self): LOGGER.info('Init TelemetryFrontendService') - self.tele_db_obj = TelemetryDB() + self.tele_db_obj = TelemetryDB(CollectorModel) self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'frontend', diff --git a/src/telemetry/frontend/service/__main__.py b/src/telemetry/frontend/service/__main__.py index 60b96646a..c18f7ba4c 100644 --- a/src/telemetry/frontend/service/__main__.py +++ b/src/telemetry/frontend/service/__main__.py @@ -47,7 +47,7 @@ def main(): kpiDBobj.create_tables() # Start metrics server - metrics_port = int(get_metrics_port()) + metrics_port = get_metrics_port() start_http_server(metrics_port) grpc_service = TelemetryFrontendService() -- GitLab