diff --git a/src/analytics/database/new_Analyzer_DB.py b/src/analytics/database/new_Analyzer_DB.py new file mode 100644 index 0000000000000000000000000000000000000000..3b3a746410b65393ddf455847d4208e465e6914f --- /dev/null +++ b/src/analytics/database/new_Analyzer_DB.py @@ -0,0 +1,67 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from common.method_wrappers.Decorator import MetricsPool +from .AnalyzerModel import Analyzer as Model +from common.tools.database.GenericDatabase import Database +from common.method_wrappers.ServiceExceptions import OperationFailedException + +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('KpiManager', 'Database') + +class AnalyzerDB(Database): + def __init__(self) -> None: + LOGGER.info('Init KpiManagerService') + super().__init__("tfs_kpi_mgmt", Model) + + def add_row_to_db(self, row): + return super().add_row_to_db(row) + + def search_db_row_by_id(self, model, col_name, id_to_search): + return super().search_db_row_by_id(model, col_name, id_to_search) + + def delete_db_row_by_id(self, model, col_name, id_to_search): + return super().delete_db_row_by_id(model, col_name, id_to_search) + + def select_with_filter(self, model, filter_object): + """ + Generic method to create filters dynamically based on filter_object attributes. + params: model: SQLAlchemy model class to query. + filter_object: Object that contains filtering criteria as attributes. + return: SQLAlchemy session, query and Model + """ + session = self.Session() + try: + query = session.query(Model) + # Apply filters based on the filter_object + if filter_object.analyzer_id: + query = query.filter(Model.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) + + if filter_object.algorithm_names: + query = query.filter(Model.algorithm_name.in_(filter_object.algorithm_names)) + + if filter_object.input_kpi_ids: + input_kpi_uuids = [k.kpi_id.uuid for k in filter_object.input_kpi_ids] + query = query.filter(Model.input_kpi_ids.op('&&')(input_kpi_uuids)) + + if filter_object.output_kpi_ids: + output_kpi_uuids = [k.kpi_id.uuid for k in filter_object.output_kpi_ids] + query = query.filter(Model.output_kpi_ids.op('&&')(output_kpi_uuids)) + except Exception as e: + LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") + raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) + + return super().select_with_filter(query, session, Model) \ No newline at end of file diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index 8bb6a17afb5b911e3652fdb8d1853b5b7bc6faf3..5ea0e2ef60227ecf05ae89b57f496ac86d3631b5 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -25,7 +25,7 @@ from common.proto.context_pb2 import Empty from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer -from analytics.database.Analyzer_DB import AnalyzerDB +from analytics.database. new_Analyzer_DB import AnalyzerDB from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger @@ -84,7 +84,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate)) self.kafka_producer.flush() - # self.StartResponseListener(analyzer_uuid) def StartResponseListener(self, filter_key=None): """ @@ -209,6 +208,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) print ('Message delivery failed: {:}'.format(err)) - # else: - # LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) - # print('Message delivered to topic {:}'.format(msg.topic())) + else: + LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) + print('Message delivered to topic {:}'.format(msg.topic())) diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py index d2428c01fb021f71a884d9a99c446bfef6e66559..f7a25f4c78547e7b1ddb8ff304c1568ca46b79e5 100644 --- a/src/analytics/frontend/tests/test_frontend.py +++ b/src/analytics/frontend/tests/test_frontend.py @@ -84,10 +84,10 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic ########################### # --- "test_validate_kafka_topics" should be executed before the functionality tests --- -def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) +# def test_validate_kafka_topics(): +# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") +# response = KafkaTopic.create_all_topics() +# assert isinstance(response, bool) # ----- core funtionality test ----- # def test_StartAnalytics(analyticsFrontend_client): @@ -102,27 +102,19 @@ def test_StartStopAnalyzers(analyticsFrontend_client): LOGGER.info('--> StartAnalyzer') added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer()) LOGGER.debug(str(added_analyzer_id)) - LOGGER.info(' --> Calling StartResponseListener... ') - class_obj = AnalyticsFrontendServiceServicerImpl() - response = class_obj.StartResponseListener(added_analyzer_id.analyzer_id._uuid) - LOGGER.debug(response) - LOGGER.info("waiting for timer to comlete ...") - time.sleep(3) - LOGGER.info('--> StopAnalyzer') - response = analyticsFrontend_client.StopAnalyzer(added_analyzer_id) - LOGGER.debug(str(response)) + assert isinstance(added_analyzer_id, AnalyzerId) -# def test_SelectAnalytics(analyticsFrontend_client): -# LOGGER.info(' >>> test_SelectAnalytics START: <<< ') -# response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter()) -# LOGGER.debug(str(response)) -# assert isinstance(response, AnalyzerList) +def test_StopAnalytic(analyticsFrontend_client): + LOGGER.info(' >>> test_StopAnalytic START: <<< ') + response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id()) + LOGGER.debug(str(response)) + assert isinstance(response, Empty) -# def test_StopAnalytic(analyticsFrontend_client): -# LOGGER.info(' >>> test_StopAnalytic START: <<< ') -# response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id()) -# LOGGER.debug(str(response)) -# assert isinstance(response, Empty) +def test_SelectAnalytics(analyticsFrontend_client): + LOGGER.info(' >>> test_SelectAnalytics START: <<< ') + response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter()) + LOGGER.debug(str(response)) + assert isinstance(response, AnalyzerList) # def test_ResponseListener(): # LOGGER.info(' >>> test_ResponseListener START <<< ') diff --git a/src/analytics/tests/test_analytics_db.py b/src/analytics/tests/test_analytics_db.py index 58e7d0167044bb461e66b053dcb3999641ea8419..f944fc0b506832fbd4ccd7bb5ac93837b102df3f 100644 --- a/src/analytics/tests/test_analytics_db.py +++ b/src/analytics/tests/test_analytics_db.py @@ -14,7 +14,7 @@ import logging -from analytics.database.Analyzer_DB import AnalyzerDB +from analytics.database.new_Analyzer_DB import AnalyzerDB LOGGER = logging.getLogger(__name__) diff --git a/src/kpi_manager/database/new_KpiDB.py b/src/kpi_manager/database/new_KpiDB.py index 64d2dc10b2072b6610a0aa1a1b24162c4a398c0d..8aa1cd20bacdc39d6efe8f33a3c1475ead53c652 100644 --- a/src/kpi_manager/database/new_KpiDB.py +++ b/src/kpi_manager/database/new_KpiDB.py @@ -15,7 +15,7 @@ import logging from common.method_wrappers.Decorator import MetricsPool -from kpi_manager.database.KpiModel import Kpi as Model +from .KpiModel import Kpi as Model from common.tools.database.GenericDatabase import Database from common.method_wrappers.ServiceExceptions import OperationFailedException