Skip to content
Snippets Groups Projects
Commit 49070d98 authored by Shayan Hajipour's avatar Shayan Hajipour
Browse files

Merge branch 'feat/190-cttc-generalize-service-database-management' into...

Merge branch 'feat/190-cttc-generalize-service-database-management' into feat/183-create-qosprofile-component
parents 44619421 56e5fd59
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!257Resolve "Create QoSProfile component"
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from kpi_manager.database.KpiModel import Kpi as KpiModel
from common.tools.database.GenericDatabase import Database
from common.method_wrappers.ServiceExceptions import OperationFailedException
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('KpiManager', 'Database')
class KpiDB(Database):
def __init__(self) -> None:
LOGGER.info('Init KpiManagerService')
super().__init__("tfs_kpi_mgmt", KpiModel)
def add_row_to_db(self, row):
return super().add_row_to_db(row)
def search_db_row_by_id(self, model, col_name, id_to_search):
return super().search_db_row_by_id(model, col_name, id_to_search)
def delete_db_row_by_id(self, model, col_name, id_to_search):
return super().delete_db_row_by_id(model, col_name, id_to_search)
def select_with_filter(self, model, filter_object):
"""
Generic method to create filters dynamically based on filter_object attributes.
params: model: SQLAlchemy model class to query.
filter_object: Object that contains filtering criteria as attributes.
return: SQLAlchemy session, query and Model
"""
session = self.Session()
try:
query = session.query(KpiModel)
# Apply filters based on the filter_object
if filter_object.kpi_id:
query = query.filter(KpiModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id]))
if filter_object.kpi_sample_type:
query = query.filter(KpiModel.kpi_sample_type.in_(filter_object.kpi_sample_type))
if filter_object.device_id:
query = query.filter(KpiModel.device_id.in_([d.device_uuid.uuid for d in filter_object.device_id]))
if filter_object.endpoint_id:
query = query.filter(KpiModel.endpoint_id.in_([e.endpoint_uuid.uuid for e in filter_object.endpoint_id]))
if filter_object.service_id:
query = query.filter(KpiModel.service_id.in_([s.service_uuid.uuid for s in filter_object.service_id]))
if filter_object.slice_id:
query = query.filter(KpiModel.slice_id.in_([s.slice_uuid.uuid for s in filter_object.slice_id]))
if filter_object.connection_id:
query = query.filter(KpiModel.connection_id.in_([c.connection_uuid.uuid for c in filter_object.connection_id]))
if filter_object.link_id:
query = query.filter(KpiModel.link_id.in_([l.link_uuid.uuid for l in filter_object.link_id]))
except Exception as e:
LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}")
raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)])
return super().select_with_filter(query, session, KpiModel)
......@@ -18,7 +18,8 @@ from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_m
from common.proto.context_pb2 import Empty
from common.proto.kpi_manager_pb2_grpc import KpiManagerServiceServicer
from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList
from kpi_manager.database.Kpi_DB import KpiDB
# from kpi_manager.database.Kpi_DB import KpiDB
from kpi_manager.database.new_KpiDB import KpiDB
from kpi_manager.database.KpiModel import Kpi as KpiModel
LOGGER = logging.getLogger(__name__)
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
......@@ -14,7 +14,12 @@
import logging
from kpi_manager.database.Kpi_DB import KpiDB
# from kpi_manager.database.Kpi_DB import KpiDB
from common.proto.kpi_manager_pb2 import KpiDescriptorList
from .test_messages import create_kpi_filter_request
from kpi_manager.database.KpiModel import Kpi as KpiModel
from kpi_manager.database.new_KpiDB import KpiDB
# from common.tools.database.GenericDatabase import Database
LOGGER = logging.getLogger(__name__)
......@@ -26,3 +31,22 @@ def test_verify_databases_and_Tables():
kpiDBobj.create_database()
kpiDBobj.create_tables()
kpiDBobj.verify_tables()
# def test_generic_DB_select_method():
# LOGGER.info("--> STARTED-test_generic_DB_select_method")
# kpi_obj = KpiDB()
# _filter = create_kpi_filter_request()
# # response = KpiDescriptorList()
# try:
# kpi_obj.select_with_filter(KpiModel, _filter)
# except Exception as e:
# LOGGER.error('Unable to apply filter on kpi descriptor. {:}'.format(e))
# LOGGER.info("--> FINISHED-test_generic_DB_select_method")
# # try:
# # for row in rows:
# # kpiDescriptor_obj = KpiModel.convert_row_to_KpiDescriptor(row)
# # response.kpi_descriptor_list.append(kpiDescriptor_obj)
# # return response
# # except Exception as e:
# # LOGGER.info('Unable to process filter response {:}'.format(e))
# # assert isinstance(r)
......@@ -139,9 +139,9 @@ def test_SelectKpiDescriptor(kpi_manager_client):
LOGGER.info("Response gRPC message object: {:}".format(response))
assert isinstance(response, KpiDescriptorList)
def test_set_list_of_KPIs(kpi_manager_client):
LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"]
# adding KPI
for kpi in KPIs_TO_SEARCH:
kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi))
# def test_set_list_of_KPIs(kpi_manager_client):
# LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
# KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"]
# # adding KPI
# for kpi in KPIs_TO_SEARCH:
# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment