import sys import os import json # flake8: noqa # Add the SDK directory to PYTHONPATH using a relative path script_dir = os.path.dirname(os.path.abspath(__file__)) # Current script directory sdk_path = os.path.join(script_dir, '..', 'sdk') # Go up two levels and point to 'sdk' sys.path.insert(0, sdk_path) from sdk import CAPIFProviderConnector, CAPIFInvokerConnector, ServiceDiscoverer capif_sdk_config_path = "./capif-sdk-config-sample-test.json" def preparation_for_update(APFs, AEFs, second_netapp_api): with open(capif_sdk_config_path, 'r') as file: config = json.load(file) config['apfs'] = APFs config['aefs'] = AEFs if second_netapp_api: config['api_description_path'] = "./netapp-provider-api-spec-2.json" else: config['api_description_path'] = "./netapp-provider-api-spec-3.json" with open(capif_sdk_config_path, 'w') as file: json.dump(config, file, indent=4) capif_provider_connector = CAPIFProviderConnector(config_file=capif_sdk_config_path) return capif_provider_connector def ensure_update(Chosen_apf, Chosen_aefs, second_netapp_api): if second_netapp_api: # Get AEFs ids and APFs ids to publish an API with open(capif_sdk_config_path, 'r') as file: config = json.load(file) provider_folder = config.get('provider_folder') username_folder = config.get('capif_username') if not provider_folder: raise ValueError("'provider_folder' value is not defined in the configuration file.") detailspath = os.path.join(provider_folder, username_folder, "capif_provider_details.json") if not os.path.exists(detailspath): raise FileNotFoundError(f"File {detailspath} not found") with open(detailspath, 'r') as file: details = json.load(file) APF = details.get(Chosen_apf) AEF1 = details.get(Chosen_aefs[0]) AEF2 = details.get(Chosen_aefs[1]) AEF3 = details.get(Chosen_aefs[2]) if not APF or not AEF1 or not AEF2: raise ValueError("Not all necessary values were found in 'capif_provider_details.json'") # Update configuration file config['publish_req']['publisher_apf_id'] = APF config['publish_req']['publisher_aefs_ids'] = [AEF1, AEF2, AEF3] with open(capif_sdk_config_path, 'w') as file: json.dump(config, file, indent=4) # Save the formatted JSON print("Configuration file updated successfully.") else: with open(capif_sdk_config_path, 'r') as file: config = json.load(file) provider_folder = config.get('provider_folder') username_folder = config.get('capif_username') if not provider_folder: raise ValueError("'provider_folder' value is not defined in the configuration file.") detailspath = os.path.join(provider_folder, username_folder, "capif_provider_details.json") if not os.path.exists(detailspath): raise FileNotFoundError(f"File {detailspath} not found") with open(detailspath, 'r') as file: details = json.load(file) APF = details.get('APF-1_api_prov_func_id') AEF1 = details.get('AEF-1_api_prov_func_id') AEF2 = details.get('AEF-2_api_prov_func_id') if not APF or not AEF1 or not AEF2: raise ValueError("Not all necessary values were found in 'capif_provider_details.json'") # Update configuration file config['publish_req']['publisher_apf_id'] = APF config['publish_req']['publisher_aefs_ids'] = [AEF1, AEF2] with open(capif_sdk_config_path, 'w') as file: json.dump(config, file, indent=4) # Save the formatted JSON print("Configuration file updated successfully.") capif_provider_connector = CAPIFProviderConnector(config_file=capif_sdk_config_path) capif_provider_connector.publish_services() print("PROVIDER PUBLISH COMPLETED") PublishedApis = os.path.join(provider_folder, username_folder, "published-Apis.json") if not os.path.exists(PublishedApis): raise FileNotFoundError(f"File {PublishedApis} not found") with open(PublishedApis, 'r') as file: PublishedApis = json.load(file) if second_netapp_api: service_api_id = PublishedApis.get('Test-2') else: service_api_id = PublishedApis.get('Test-3') with open(capif_sdk_config_path, 'r') as file: config = json.load(file) config['publish_req']['service_api_id'] = service_api_id with open(capif_sdk_config_path, 'w') as file: json.dump(config, file, indent=4) # Save the formatted JSON capif_provider_connector = CAPIFProviderConnector(config_file=capif_sdk_config_path) capif_provider_connector.update_service() print("PROVIDER UPDATE SERVICE COMPLETED") capif_provider_connector.get_all_services() print("PROVIDER GET ALL SERVICES COMPLETED") capif_provider_connector.get_service() print("PROVIDER GET SERVICE COMPLETED") capif_provider_connector.unpublish_service() if __name__ == "__main__": try: # Initialization of the connector capif_provider_connector = CAPIFProviderConnector(config_file=capif_sdk_config_path) capif_provider_connector.onboard_provider() print("PROVIDER ONBOARDING COMPLETED") # Get AEFs ids and APFs ids to publish an API with open(capif_sdk_config_path, 'r') as file: config = json.load(file) provider_folder = config.get('provider_folder') username_folder = config.get('capif_username') if not provider_folder: raise ValueError("'provider_folder' value is not defined in the configuration file.") detailspath = os.path.join(provider_folder, username_folder, "capif_provider_details.json") if not os.path.exists(detailspath): raise FileNotFoundError(f"File {detailspath} not found") with open(detailspath, 'r') as file: details = json.load(file) APF = details.get('APF-1_api_prov_func_id') AEF1 = details.get('AEF-1_api_prov_func_id') AEF2 = details.get('AEF-2_api_prov_func_id') if not APF or not AEF1 or not AEF2: raise ValueError("Not all necessary values were found in 'capif_provider_details.json'") # Update configuration file config['publish_req']['publisher_apf_id'] = APF config['publish_req']['publisher_aefs_ids'] = [AEF1, AEF2] with open(capif_sdk_config_path, 'w') as file: json.dump(config, file, indent=4) # Save the formatted JSON print("Configuration file updated successfully.") # Update the constructor with new configuration parameters capif_provider_connector = CAPIFProviderConnector(config_file=capif_sdk_config_path) capif_provider_connector.publish_services() print("PROVIDER PUBLISH COMPLETED") PublishedApis = os.path.join(provider_folder, username_folder, "published-Apis.json") if not os.path.exists(PublishedApis): raise FileNotFoundError(f"File {PublishedApis} not found") with open(PublishedApis, 'r') as file: PublishedApis = json.load(file) service_api_id = PublishedApis.get('Test') with open(capif_sdk_config_path, 'r') as file: config = json.load(file) config['publish_req']['service_api_id'] = service_api_id with open(capif_sdk_config_path, 'w') as file: json.dump(config, file, indent=4) # Save the formatted JSON capif_provider_connector = CAPIFProviderConnector(config_file=capif_sdk_config_path) capif_provider_connector.update_service() print("PROVIDER UPDATE COMPLETED") capif_provider_connector.get_all_services() print("PROVIDER GET ALL SERVICES COMPLETED") capif_provider_connector.get_service() print("PROVIDER GET SERVICE COMPLETED") capif_invoker_connector = CAPIFInvokerConnector(config_file=capif_sdk_config_path) capif_invoker_connector.onboard_invoker() print("INVOKER ONBOARDING COMPLETED") discoverer = ServiceDiscoverer(config_file=capif_sdk_config_path) discoverer.discover() print("SERVICE DISCOVER COMPLETED") discoverer.get_tokens() print("SERVICE GET TOKENS COMPLETED") capif_invoker_connector.update_invoker() print("INVOKER UPDATE SERVICE COMPLETED") capif_invoker_connector.offboard_invoker() print("INVOKER OFFBOARD COMPLETED") capif_provider_connector.unpublish_service() print("PROVIDER UNPUBLISH SERVICE COMPLETED") capif_provider_connector = preparation_for_update("2", "4", True) capif_provider_connector.update_provider() Chosen_apf = "APF-2_api_prov_func_id" Chosen_aefs = ["AEF-1_api_prov_func_id", "AEF-3_api_prov_func_id", "AEF-4_api_prov_func_id"] ensure_update(Chosen_apf, Chosen_aefs, True) print("PROVIDER UPDATE ONE COMPLETED") capif_provider_connector = preparation_for_update("1", "2", False) capif_provider_connector.update_provider() Chosen_apf = "APF-1_api_prov_func_id" Chosen_aefs = ["AEF-1_api_prov_func_id", "AEF-2_api_prov_func_id"] ensure_update(Chosen_apf, Chosen_aefs, False) print("PROVIDER UPDATE TWO COMPLETED") capif_provider_connector.offboard_provider() print("PROVIDER OFFBOARDING COMPLETED") print("ALL TESTS PASSED CORRECTLY") except FileNotFoundError as e: print(f"Error: {e}") except json.JSONDecodeError as e: print(f"Error reading the JSON file: {e}") except Exception as e: print(f"Unexpected error: {e}")