# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging #from common.proto.kpi_manager_pb2 import KpiDescriptorList #from .test_messages import create_kpi_filter_request from kpi_manager.database.KpiDB import KpiDB from kpi_manager.database.KpiModel import Kpi as KpiModel # from common.tools.database.GenericDatabase import Database LOGGER = logging.getLogger(__name__) def test_verify_databases_and_Tables(): LOGGER.info('>>> test_verify_Tables : START <<< ') kpiDBobj = KpiDB(KpiModel) # kpiDBobj.drop_database() # kpiDBobj.verify_tables() kpiDBobj.create_database() kpiDBobj.create_tables() kpiDBobj.verify_tables() # def test_generic_DB_select_method(): # LOGGER.info("--> STARTED-test_generic_DB_select_method") # kpi_obj = KpiDB() # _filter = create_kpi_filter_request() # # response = KpiDescriptorList() # try: # kpi_obj.select_with_filter(KpiModel, _filter) # except Exception as e: # LOGGER.error('Unable to apply filter on kpi descriptor. {:}'.format(e)) # LOGGER.info("--> FINISHED-test_generic_DB_select_method") # # try: # # for row in rows: # # kpiDescriptor_obj = KpiModel.convert_row_to_KpiDescriptor(row) # # response.kpi_descriptor_list.append(kpiDescriptor_obj) # # return response # # except Exception as e: # # LOGGER.info('Unable to process filter response {:}'.format(e)) # # assert isinstance(r)