Newer
Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
# from kpi_manager.database.Kpi_DB import KpiDB
from common.proto.kpi_manager_pb2 import KpiDescriptorList
from .test_messages import create_kpi_filter_request
from kpi_manager.database.KpiModel import Kpi as KpiModel
from kpi_manager.database.KpiDB import KpiDB
# from common.tools.database.GenericDatabase import Database
LOGGER = logging.getLogger(__name__)
def test_verify_databases_and_Tables():
LOGGER.info('>>> test_verify_Tables : START <<< ')
# kpiDBobj.drop_database()
# kpiDBobj.verify_tables()
kpiDBobj.create_tables()
kpiDBobj.verify_tables()
# def test_generic_DB_select_method():
# LOGGER.info("--> STARTED-test_generic_DB_select_method")
# kpi_obj = KpiDB()
# _filter = create_kpi_filter_request()
# # response = KpiDescriptorList()
# try:
# kpi_obj.select_with_filter(KpiModel, _filter)
# except Exception as e:
# LOGGER.error('Unable to apply filter on kpi descriptor. {:}'.format(e))
# LOGGER.info("--> FINISHED-test_generic_DB_select_method")
# # try:
# # for row in rows:
# # kpiDescriptor_obj = KpiModel.convert_row_to_KpiDescriptor(row)
# # response.kpi_descriptor_list.append(kpiDescriptor_obj)
# # return response
# # except Exception as e:
# # LOGGER.info('Unable to process filter response {:}'.format(e))
# # assert isinstance(r)