diff --git a/src/analytics/backend/service/__main__.py b/src/analytics/backend/service/__main__.py index 9b1941e206c71f0870d0f66ff9b308cc1e68c8e7..3c4c36b7c7bd952164bf9e48a45e22fb00575564 100644 --- a/src/analytics/backend/service/__main__.py +++ b/src/analytics/backend/service/__main__.py @@ -37,7 +37,7 @@ def main(): LOGGER.info('Starting...') # Start metrics server - metrics_port = int(get_metrics_port()) + metrics_port = get_metrics_port() start_http_server(metrics_port) grpc_service = AnalyticsBackendService() diff --git a/src/analytics/database/Analyzer_DB.py b/src/analytics/database/Analyzer_DB.py index 8420b66b522aa0d41e0d1c84a31c8cc5a3cc2c81..99e9c62f6b4c529e3aa62368247a56c206253a56 100644 --- a/src/analytics/database/Analyzer_DB.py +++ b/src/analytics/database/Analyzer_DB.py @@ -13,20 +13,17 @@ # limitations under the License. import logging -from common.Settings import get_setting from common.method_wrappers.Decorator import MetricsPool -from .AnalyzerModel import Analyzer as Model from common.tools.database.GenericDatabase import Database from common.method_wrappers.ServiceExceptions import OperationFailedException LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('KpiManager', 'Database') -DB_NAME = get_setting('CRDB_DATABASE', default=None) class AnalyzerDB(Database): - def __init__(self) -> None: + def __init__(self, model) -> None: LOGGER.info('Init KpiManagerService') - super().__init__(DB_NAME, Model) + super().__init__(model) def select_with_filter(self, model, filter_object): """ @@ -37,23 +34,23 @@ class AnalyzerDB(Database): """ session = self.Session() try: - query = session.query(Model) + query = session.query(model) # Apply filters based on the filter_object if filter_object.analyzer_id: - query = query.filter(Model.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) + query = query.filter(model.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) if filter_object.algorithm_names: - query = query.filter(Model.algorithm_name.in_(filter_object.algorithm_names)) + query = query.filter(model.algorithm_name.in_(filter_object.algorithm_names)) if filter_object.input_kpi_ids: input_kpi_uuids = [k.kpi_id.uuid for k in filter_object.input_kpi_ids] - query = query.filter(Model.input_kpi_ids.op('&&')(input_kpi_uuids)) + query = query.filter(model.input_kpi_ids.op('&&')(input_kpi_uuids)) if filter_object.output_kpi_ids: output_kpi_uuids = [k.kpi_id.uuid for k in filter_object.output_kpi_ids] - query = query.filter(Model.output_kpi_ids.op('&&')(output_kpi_uuids)) + query = query.filter(model.output_kpi_ids.op('&&')(output_kpi_uuids)) except Exception as e: LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) - return super().select_with_filter(query, session, Model) \ No newline at end of file + return super().select_with_filter(query, session, model) \ No newline at end of file diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index ab274ba36bf945488769d5ffde297ac10349b320..a7fc8d49248ff01a860accac1b64a29d5533069f 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -37,7 +37,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): def __init__(self): LOGGER.info('Init AnalyticsFrontendService') self.listener_topic = KafkaTopic.ANALYTICS_RESPONSE.value - self.db_obj = AnalyzerDB() + self.db_obj = AnalyzerDB(AnalyzerModel) self.result_queue = queue.Queue() self.scheduler = BackgroundScheduler() self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) diff --git a/src/analytics/frontend/service/__main__.py b/src/analytics/frontend/service/__main__.py index bf94eb5e6d968c221100af14def35bb2bb3545a3..cc324e2dc9b4a963d49b9c11f4edfb691b717f35 100644 --- a/src/analytics/frontend/service/__main__.py +++ b/src/analytics/frontend/service/__main__.py @@ -47,7 +47,7 @@ def main(): kpiDBobj.create_tables() # Start metrics server - metrics_port = int(get_metrics_port()) + metrics_port = get_metrics_port() start_http_server(metrics_port) grpc_service = AnalyticsFrontendService() diff --git a/src/common/tools/database/GenericDatabase.py b/src/common/tools/database/GenericDatabase.py index 7c6453d7c46eb9b9a46bfe6346fe89b495e0b8c9..9868401ec05eabbb220f7c170a2ab66dad09e998 100644 --- a/src/common/tools/database/GenericDatabase.py +++ b/src/common/tools/database/GenericDatabase.py @@ -18,18 +18,20 @@ import sqlalchemy_utils from .GenericEngine import Engine from sqlalchemy import inspect from sqlalchemy.orm import sessionmaker +from common.Settings import get_setting from common.method_wrappers.ServiceExceptions import (OperationFailedException, AlreadyExistsException) LOGGER = logging.getLogger(__name__) +DB_NAME = get_setting('CRDB_DATABASE', default=None) class Database: - def __init__(self, db_name, model): - self.db_engine = Engine.get_engine(db_name) + def __init__(self, model): + self.db_engine = Engine.get_engine(DB_NAME) if self.db_engine is None: LOGGER.error('Unable to get SQLAlchemy DB Engine...') raise Exception('Failed to initialize the database engine.') - self.db_name = db_name + self.db_name = DB_NAME self.db_model = model self.db_table = model.__name__ self.Session = sessionmaker(bind=self.db_engine) diff --git a/src/kpi_manager/database/KpiDB.py b/src/kpi_manager/database/KpiDB.py index cd1b20e1954d85a68d375979d0d9441837f7a252..d503f06f4cdeb57efd4c02701803f81fd31d3eea 100644 --- a/src/kpi_manager/database/KpiDB.py +++ b/src/kpi_manager/database/KpiDB.py @@ -13,21 +13,17 @@ # limitations under the License. import logging -from common.Settings import get_setting - from common.method_wrappers.Decorator import MetricsPool -from .KpiModel import Kpi as Model from common.tools.database.GenericDatabase import Database from common.method_wrappers.ServiceExceptions import OperationFailedException LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('KpiManager', 'Database') -DB_NAME = get_setting('CRDB_DATABASE', default=None) class KpiDB(Database): - def __init__(self) -> None: + def __init__(self, model) -> None: LOGGER.info('Init KpiManagerService') - super().__init__(DB_NAME, Model) + super().__init__(model) def select_with_filter(self, model, filter_object): """ @@ -38,33 +34,33 @@ class KpiDB(Database): """ session = self.Session() try: - query = session.query(Model) + query = session.query(model) # Apply filters based on the filter_object if filter_object.kpi_id: - query = query.filter(Model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) + query = query.filter(model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) if filter_object.kpi_sample_type: - query = query.filter(Model.kpi_sample_type.in_(filter_object.kpi_sample_type)) + query = query.filter(model.kpi_sample_type.in_(filter_object.kpi_sample_type)) if filter_object.device_id: - query = query.filter(Model.device_id.in_([d.device_uuid.uuid for d in filter_object.device_id])) + query = query.filter(model.device_id.in_([d.device_uuid.uuid for d in filter_object.device_id])) if filter_object.endpoint_id: - query = query.filter(Model.endpoint_id.in_([e.endpoint_uuid.uuid for e in filter_object.endpoint_id])) + query = query.filter(model.endpoint_id.in_([e.endpoint_uuid.uuid for e in filter_object.endpoint_id])) if filter_object.service_id: - query = query.filter(Model.service_id.in_([s.service_uuid.uuid for s in filter_object.service_id])) + query = query.filter(model.service_id.in_([s.service_uuid.uuid for s in filter_object.service_id])) if filter_object.slice_id: - query = query.filter(Model.slice_id.in_([s.slice_uuid.uuid for s in filter_object.slice_id])) + query = query.filter(model.slice_id.in_([s.slice_uuid.uuid for s in filter_object.slice_id])) if filter_object.connection_id: - query = query.filter(Model.connection_id.in_([c.connection_uuid.uuid for c in filter_object.connection_id])) + query = query.filter(model.connection_id.in_([c.connection_uuid.uuid for c in filter_object.connection_id])) if filter_object.link_id: - query = query.filter(Model.link_id.in_([l.link_uuid.uuid for l in filter_object.link_id])) + query = query.filter(model.link_id.in_([l.link_uuid.uuid for l in filter_object.link_id])) except Exception as e: LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) - return super().select_with_filter(query, session, Model) + return super().select_with_filter(query, session, model) diff --git a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py index 57acf3afe5bfaa4615d74f6865f3231ae0d5ef6f..3f9ae8492380e5e11cd3cbc926a2fce07620d8a7 100644 --- a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py +++ b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py @@ -28,7 +28,7 @@ METRICS_POOL = MetricsPool('KpiManager', 'NBIgRPC') class KpiManagerServiceServicerImpl(KpiManagerServiceServicer): def __init__(self): LOGGER.info('Init KpiManagerService') - self.kpi_db_obj = KpiDB() + self.kpi_db_obj = KpiDB(KpiModel) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetKpiDescriptor(self, request: KpiDescriptor, grpc_context: grpc.ServicerContext # type: ignore diff --git a/src/telemetry/backend/service/__main__.py b/src/telemetry/backend/service/__main__.py index 8af209e2c2b5f13d4f37a9b865b85e31d702cf8c..9ec9e191fd22e07da46f80214ade0ac516032433 100644 --- a/src/telemetry/backend/service/__main__.py +++ b/src/telemetry/backend/service/__main__.py @@ -37,7 +37,7 @@ def main(): LOGGER.info('Starting...') # Start metrics server - metrics_port = int(get_metrics_port()) + metrics_port = get_metrics_port() start_http_server(metrics_port) grpc_service = TelemetryBackendService() diff --git a/src/telemetry/database/Telemetry_DB.py b/src/telemetry/database/Telemetry_DB.py index 66cbfe77ff879afd04eb5c5449a8f1dee8d07d9a..110c7e80a4c36eed15417bfa05c4057ccb7fe292 100644 --- a/src/telemetry/database/Telemetry_DB.py +++ b/src/telemetry/database/Telemetry_DB.py @@ -13,20 +13,17 @@ # limitations under the License. import logging -from common.Settings import get_setting from common.method_wrappers.Decorator import MetricsPool -from .TelemetryModel import Collector as Model from common.tools.database.GenericDatabase import Database from common.method_wrappers.ServiceExceptions import OperationFailedException LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('TelemteryFrontend', 'Database') -DB_NAME = get_setting('CRDB_DATABASE', default=None) class TelemetryDB(Database): - def __init__(self) -> None: + def __init__(self, model) -> None: LOGGER.info('Init KpiManagerService') - super().__init__(DB_NAME, Model) + super().__init__(model) def select_with_filter(self, model, filter_object): """ @@ -37,11 +34,11 @@ class TelemetryDB(Database): """ session = self.Session() try: - query = session.query(Model) + query = session.query(model) if filter_object.kpi_id: - query = query.filter(Model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) + query = query.filter(model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) except Exception as e: LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) - return super().select_with_filter(query, session, Model) + return super().select_with_filter(query, session, model) diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index 43ac9455ebf291ab51dff3eb3d33d5072fce1ffa..61446fcf81f355aef38ae69dbf8f14dc9a645f3f 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -26,7 +26,7 @@ from common.proto.telemetry_frontend_pb2_grpc import TelemetryFrontendServiceSer from telemetry.database.TelemetryModel import Collector as CollectorModel # from telemetry.database.Telemetry_DB import TelemetryDB -from ...database.Telemetry_DB import TelemetryDB +from telemetry.database.Telemetry_DB import TelemetryDB from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Producer as KafkaProducer @@ -41,7 +41,7 @@ ACTIVE_COLLECTORS = [] # keep and can be populated from DB class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): def __init__(self): LOGGER.info('Init TelemetryFrontendService') - self.tele_db_obj = TelemetryDB() + self.tele_db_obj = TelemetryDB(CollectorModel) self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'frontend', diff --git a/src/telemetry/frontend/service/__main__.py b/src/telemetry/frontend/service/__main__.py index 60b96646afb49106eca030b78458ad2790bcb26f..c18f7ba4c06c311807746d4a479279fb622f0054 100644 --- a/src/telemetry/frontend/service/__main__.py +++ b/src/telemetry/frontend/service/__main__.py @@ -47,7 +47,7 @@ def main(): kpiDBobj.create_tables() # Start metrics server - metrics_port = int(get_metrics_port()) + metrics_port = get_metrics_port() start_http_server(metrics_port) grpc_service = TelemetryFrontendService()