Skip to content
Snippets Groups Projects
Commit 4d6ef02b authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Refactor AnalyticsFrontend to reflect Generic DB changes

parent 48748aa9
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!267Resolve "(CTTC) Generalize Service Database management"
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from common.method_wrappers.Decorator import MetricsPool
from .AnalyzerModel import Analyzer as Model
from common.tools.database.GenericDatabase import Database
from common.method_wrappers.ServiceExceptions import OperationFailedException
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('KpiManager', 'Database')
class AnalyzerDB(Database):
def __init__(self) -> None:
LOGGER.info('Init KpiManagerService')
super().__init__("tfs_kpi_mgmt", Model)
def add_row_to_db(self, row):
return super().add_row_to_db(row)
def search_db_row_by_id(self, model, col_name, id_to_search):
return super().search_db_row_by_id(model, col_name, id_to_search)
def delete_db_row_by_id(self, model, col_name, id_to_search):
return super().delete_db_row_by_id(model, col_name, id_to_search)
def select_with_filter(self, model, filter_object):
"""
Generic method to create filters dynamically based on filter_object attributes.
params: model: SQLAlchemy model class to query.
filter_object: Object that contains filtering criteria as attributes.
return: SQLAlchemy session, query and Model
"""
session = self.Session()
try:
query = session.query(Model)
# Apply filters based on the filter_object
if filter_object.analyzer_id:
query = query.filter(Model.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id]))
if filter_object.algorithm_names:
query = query.filter(Model.algorithm_name.in_(filter_object.algorithm_names))
if filter_object.input_kpi_ids:
input_kpi_uuids = [k.kpi_id.uuid for k in filter_object.input_kpi_ids]
query = query.filter(Model.input_kpi_ids.op('&&')(input_kpi_uuids))
if filter_object.output_kpi_ids:
output_kpi_uuids = [k.kpi_id.uuid for k in filter_object.output_kpi_ids]
query = query.filter(Model.output_kpi_ids.op('&&')(output_kpi_uuids))
except Exception as e:
LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}")
raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)])
return super().select_with_filter(query, session, Model)
\ No newline at end of file
...@@ -25,7 +25,7 @@ from common.proto.context_pb2 import Empty ...@@ -25,7 +25,7 @@ from common.proto.context_pb2 import Empty
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList
from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer
from analytics.database.Analyzer_DB import AnalyzerDB from analytics.database. new_Analyzer_DB import AnalyzerDB
from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.interval import IntervalTrigger
...@@ -84,7 +84,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -84,7 +84,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate)) LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate))
self.kafka_producer.flush() self.kafka_producer.flush()
# self.StartResponseListener(analyzer_uuid)
def StartResponseListener(self, filter_key=None): def StartResponseListener(self, filter_key=None):
""" """
...@@ -209,6 +208,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -209,6 +208,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
if err: if err:
LOGGER.debug('Message delivery failed: {:}'.format(err)) LOGGER.debug('Message delivery failed: {:}'.format(err))
print ('Message delivery failed: {:}'.format(err)) print ('Message delivery failed: {:}'.format(err))
# else: else:
# LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
# print('Message delivered to topic {:}'.format(msg.topic())) print('Message delivered to topic {:}'.format(msg.topic()))
...@@ -84,10 +84,10 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic ...@@ -84,10 +84,10 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic
########################### ###########################
# --- "test_validate_kafka_topics" should be executed before the functionality tests --- # --- "test_validate_kafka_topics" should be executed before the functionality tests ---
def test_validate_kafka_topics(): # def test_validate_kafka_topics():
LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") # LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics() # response = KafkaTopic.create_all_topics()
assert isinstance(response, bool) # assert isinstance(response, bool)
# ----- core funtionality test ----- # ----- core funtionality test -----
# def test_StartAnalytics(analyticsFrontend_client): # def test_StartAnalytics(analyticsFrontend_client):
...@@ -102,27 +102,19 @@ def test_StartStopAnalyzers(analyticsFrontend_client): ...@@ -102,27 +102,19 @@ def test_StartStopAnalyzers(analyticsFrontend_client):
LOGGER.info('--> StartAnalyzer') LOGGER.info('--> StartAnalyzer')
added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer()) added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer())
LOGGER.debug(str(added_analyzer_id)) LOGGER.debug(str(added_analyzer_id))
LOGGER.info(' --> Calling StartResponseListener... ') assert isinstance(added_analyzer_id, AnalyzerId)
class_obj = AnalyticsFrontendServiceServicerImpl()
response = class_obj.StartResponseListener(added_analyzer_id.analyzer_id._uuid)
LOGGER.debug(response)
LOGGER.info("waiting for timer to comlete ...")
time.sleep(3)
LOGGER.info('--> StopAnalyzer')
response = analyticsFrontend_client.StopAnalyzer(added_analyzer_id)
LOGGER.debug(str(response))
# def test_SelectAnalytics(analyticsFrontend_client): def test_StopAnalytic(analyticsFrontend_client):
# LOGGER.info(' >>> test_SelectAnalytics START: <<< ') LOGGER.info(' >>> test_StopAnalytic START: <<< ')
# response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter()) response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id())
# LOGGER.debug(str(response)) LOGGER.debug(str(response))
# assert isinstance(response, AnalyzerList) assert isinstance(response, Empty)
# def test_StopAnalytic(analyticsFrontend_client): def test_SelectAnalytics(analyticsFrontend_client):
# LOGGER.info(' >>> test_StopAnalytic START: <<< ') LOGGER.info(' >>> test_SelectAnalytics START: <<< ')
# response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id()) response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter())
# LOGGER.debug(str(response)) LOGGER.debug(str(response))
# assert isinstance(response, Empty) assert isinstance(response, AnalyzerList)
# def test_ResponseListener(): # def test_ResponseListener():
# LOGGER.info(' >>> test_ResponseListener START <<< ') # LOGGER.info(' >>> test_ResponseListener START <<< ')
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
import logging import logging
from analytics.database.Analyzer_DB import AnalyzerDB from analytics.database.new_Analyzer_DB import AnalyzerDB
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
import logging import logging
from common.method_wrappers.Decorator import MetricsPool from common.method_wrappers.Decorator import MetricsPool
from kpi_manager.database.KpiModel import Kpi as Model from .KpiModel import Kpi as Model
from common.tools.database.GenericDatabase import Database from common.tools.database.GenericDatabase import Database
from common.method_wrappers.ServiceExceptions import OperationFailedException from common.method_wrappers.ServiceExceptions import OperationFailedException
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment