Loading src/analytics/backend/service/__main__.py +1 −1 Original line number Diff line number Diff line Loading @@ -37,7 +37,7 @@ def main(): LOGGER.info('Starting...') # Start metrics server metrics_port = int(get_metrics_port()) metrics_port = get_metrics_port() start_http_server(metrics_port) grpc_service = AnalyticsBackendService() Loading src/analytics/database/Analyzer_DB.py +8 −11 Original line number Diff line number Diff line Loading @@ -13,20 +13,17 @@ # limitations under the License. import logging from common.Settings import get_setting from common.method_wrappers.Decorator import MetricsPool from .AnalyzerModel import Analyzer as Model from common.tools.database.GenericDatabase import Database from common.method_wrappers.ServiceExceptions import OperationFailedException LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('KpiManager', 'Database') DB_NAME = get_setting('CRDB_DATABASE', default=None) class AnalyzerDB(Database): def __init__(self) -> None: def __init__(self, model) -> None: LOGGER.info('Init KpiManagerService') super().__init__(DB_NAME, Model) super().__init__(model) def select_with_filter(self, model, filter_object): """ Loading @@ -37,23 +34,23 @@ class AnalyzerDB(Database): """ session = self.Session() try: query = session.query(Model) query = session.query(model) # Apply filters based on the filter_object if filter_object.analyzer_id: query = query.filter(Model.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) query = query.filter(model.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) if filter_object.algorithm_names: query = query.filter(Model.algorithm_name.in_(filter_object.algorithm_names)) query = query.filter(model.algorithm_name.in_(filter_object.algorithm_names)) if filter_object.input_kpi_ids: input_kpi_uuids = [k.kpi_id.uuid for k in filter_object.input_kpi_ids] query = query.filter(Model.input_kpi_ids.op('&&')(input_kpi_uuids)) query = query.filter(model.input_kpi_ids.op('&&')(input_kpi_uuids)) if filter_object.output_kpi_ids: output_kpi_uuids = [k.kpi_id.uuid for k in filter_object.output_kpi_ids] query = query.filter(Model.output_kpi_ids.op('&&')(output_kpi_uuids)) query = query.filter(model.output_kpi_ids.op('&&')(output_kpi_uuids)) except Exception as e: LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) return super().select_with_filter(query, session, Model) No newline at end of file return super().select_with_filter(query, session, model) No newline at end of file src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +1 −1 Original line number Diff line number Diff line Loading @@ -37,7 +37,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): def __init__(self): LOGGER.info('Init AnalyticsFrontendService') self.listener_topic = KafkaTopic.ANALYTICS_RESPONSE.value self.db_obj = AnalyzerDB() self.db_obj = AnalyzerDB(AnalyzerModel) self.result_queue = queue.Queue() self.scheduler = BackgroundScheduler() self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) Loading src/analytics/frontend/service/__main__.py +1 −1 Original line number Diff line number Diff line Loading @@ -47,7 +47,7 @@ def main(): kpiDBobj.create_tables() # Start metrics server metrics_port = int(get_metrics_port()) metrics_port = get_metrics_port() start_http_server(metrics_port) grpc_service = AnalyticsFrontendService() Loading src/common/tools/database/GenericDatabase.py +5 −3 Original line number Diff line number Diff line Loading @@ -18,18 +18,20 @@ import sqlalchemy_utils from .GenericEngine import Engine from sqlalchemy import inspect from sqlalchemy.orm import sessionmaker from common.Settings import get_setting from common.method_wrappers.ServiceExceptions import (OperationFailedException, AlreadyExistsException) LOGGER = logging.getLogger(__name__) DB_NAME = get_setting('CRDB_DATABASE', default=None) class Database: def __init__(self, db_name, model): self.db_engine = Engine.get_engine(db_name) def __init__(self, model): self.db_engine = Engine.get_engine(DB_NAME) if self.db_engine is None: LOGGER.error('Unable to get SQLAlchemy DB Engine...') raise Exception('Failed to initialize the database engine.') self.db_name = db_name self.db_name = DB_NAME self.db_model = model self.db_table = model.__name__ self.Session = sessionmaker(bind=self.db_engine) Loading Loading
src/analytics/backend/service/__main__.py +1 −1 Original line number Diff line number Diff line Loading @@ -37,7 +37,7 @@ def main(): LOGGER.info('Starting...') # Start metrics server metrics_port = int(get_metrics_port()) metrics_port = get_metrics_port() start_http_server(metrics_port) grpc_service = AnalyticsBackendService() Loading
src/analytics/database/Analyzer_DB.py +8 −11 Original line number Diff line number Diff line Loading @@ -13,20 +13,17 @@ # limitations under the License. import logging from common.Settings import get_setting from common.method_wrappers.Decorator import MetricsPool from .AnalyzerModel import Analyzer as Model from common.tools.database.GenericDatabase import Database from common.method_wrappers.ServiceExceptions import OperationFailedException LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('KpiManager', 'Database') DB_NAME = get_setting('CRDB_DATABASE', default=None) class AnalyzerDB(Database): def __init__(self) -> None: def __init__(self, model) -> None: LOGGER.info('Init KpiManagerService') super().__init__(DB_NAME, Model) super().__init__(model) def select_with_filter(self, model, filter_object): """ Loading @@ -37,23 +34,23 @@ class AnalyzerDB(Database): """ session = self.Session() try: query = session.query(Model) query = session.query(model) # Apply filters based on the filter_object if filter_object.analyzer_id: query = query.filter(Model.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) query = query.filter(model.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) if filter_object.algorithm_names: query = query.filter(Model.algorithm_name.in_(filter_object.algorithm_names)) query = query.filter(model.algorithm_name.in_(filter_object.algorithm_names)) if filter_object.input_kpi_ids: input_kpi_uuids = [k.kpi_id.uuid for k in filter_object.input_kpi_ids] query = query.filter(Model.input_kpi_ids.op('&&')(input_kpi_uuids)) query = query.filter(model.input_kpi_ids.op('&&')(input_kpi_uuids)) if filter_object.output_kpi_ids: output_kpi_uuids = [k.kpi_id.uuid for k in filter_object.output_kpi_ids] query = query.filter(Model.output_kpi_ids.op('&&')(output_kpi_uuids)) query = query.filter(model.output_kpi_ids.op('&&')(output_kpi_uuids)) except Exception as e: LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) return super().select_with_filter(query, session, Model) No newline at end of file return super().select_with_filter(query, session, model) No newline at end of file
src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +1 −1 Original line number Diff line number Diff line Loading @@ -37,7 +37,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): def __init__(self): LOGGER.info('Init AnalyticsFrontendService') self.listener_topic = KafkaTopic.ANALYTICS_RESPONSE.value self.db_obj = AnalyzerDB() self.db_obj = AnalyzerDB(AnalyzerModel) self.result_queue = queue.Queue() self.scheduler = BackgroundScheduler() self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) Loading
src/analytics/frontend/service/__main__.py +1 −1 Original line number Diff line number Diff line Loading @@ -47,7 +47,7 @@ def main(): kpiDBobj.create_tables() # Start metrics server metrics_port = int(get_metrics_port()) metrics_port = get_metrics_port() start_http_server(metrics_port) grpc_service = AnalyticsFrontendService() Loading
src/common/tools/database/GenericDatabase.py +5 −3 Original line number Diff line number Diff line Loading @@ -18,18 +18,20 @@ import sqlalchemy_utils from .GenericEngine import Engine from sqlalchemy import inspect from sqlalchemy.orm import sessionmaker from common.Settings import get_setting from common.method_wrappers.ServiceExceptions import (OperationFailedException, AlreadyExistsException) LOGGER = logging.getLogger(__name__) DB_NAME = get_setting('CRDB_DATABASE', default=None) class Database: def __init__(self, db_name, model): self.db_engine = Engine.get_engine(db_name) def __init__(self, model): self.db_engine = Engine.get_engine(DB_NAME) if self.db_engine is None: LOGGER.error('Unable to get SQLAlchemy DB Engine...') raise Exception('Failed to initialize the database engine.') self.db_name = db_name self.db_name = DB_NAME self.db_model = model self.db_table = model.__name__ self.Session = sessionmaker(bind=self.db_engine) Loading