Newer
Older
# Add the SDK directory to PYTHONPATH using a relative path
script_dir = os.path.dirname(os.path.abspath(__file__)) # Current script directory
sdk_path = os.path.join(script_dir, '..', 'sdk') # Go up two levels and point to 'sdk'
sys.path.insert(0, sdk_path)
from sdk import CAPIFProviderConnector, CAPIFInvokerConnector, ServiceDiscoverer
capif_sdk_config_path = "./capif-sdk-config-sample-test.json"
def preparation_for_update(APFs, AEFs, second_netapp_api):
config = json.load(file)
config['apfs'] = APFs
config['aefs'] = AEFs
config['api_description_path'] = "./netapp-provider-api-spec-2.json"
config['api_description_path'] = "./netapp-provider-api-spec-3.json"
json.dump(config, file, indent=4)
capif_provider_connector = CAPIFProviderConnector(config_file=capif_sdk_config_path)
return capif_provider_connector
def ensure_update(Chosen_apf, Chosen_aefs, second_netapp_api):
# Get AEFs ids and APFs ids to publish an API
with open(capif_sdk_config_path, 'r') as file:
config = json.load(file)
provider_folder = config.get('provider_folder')
username_folder = config.get('capif_username')
if not provider_folder:
raise ValueError("'provider_folder' value is not defined in the configuration file.")
detailspath = os.path.join(provider_folder, username_folder, "capif_provider_details.json")
if not os.path.exists(detailspath):
raise FileNotFoundError(f"File {detailspath} not found")
with open(detailspath, 'r') as file:
details = json.load(file)
APF = details.get(Chosen_apf)
AEF1 = details.get(Chosen_aefs[0])
AEF2 = details.get(Chosen_aefs[1])
AEF3 = details.get(Chosen_aefs[2])
if not APF or not AEF1 or not AEF2:
raise ValueError("Not all necessary values were found in 'Capif_provider_details.json'")
config['publish_req']['publisher_aefs_ids'] = [AEF1, AEF2, AEF3]
json.dump(config, file, indent=4) # Save the formatted JSON
print("Configuration file updated successfully.")
else:
with open(capif_sdk_config_path, 'r') as file:
config = json.load(file)
provider_folder = config.get('provider_folder')
username_folder = config.get('capif_username')
if not provider_folder:
raise ValueError("'provider_folder' value is not defined in the configuration file.")
detailspath = os.path.join(provider_folder, username_folder, "capif_provider_details.json")
if not os.path.exists(detailspath):
raise FileNotFoundError(f"File {detailspath} not found")
with open(detailspath, 'r') as file:
details = json.load(file)
APF = details.get('APF-1_api_prov_func_id')
AEF1 = details.get('AEF-1_api_prov_func_id')
AEF2 = details.get('AEF-2_api_prov_func_id')
if not APF or not AEF1 or not AEF2:
raise ValueError("Not all necessary values were found in 'Capif_provider_details.json'")
config['publish_req']['publisher_apf_id'] = APF
config['publish_req']['publisher_aefs_ids'] = [AEF1, AEF2]
with open(capif_sdk_config_path, 'w') as file:
json.dump(config, file, indent=4) # Save the formatted JSON
print("Configuration file updated successfully.")
capif_provider_connector = CAPIFProviderConnector(config_file=capif_sdk_config_path)
capif_provider_connector.publish_services()
print("PROVIDER PUBLISH COMPLETED")
PublishedApis = os.path.join(provider_folder, username_folder, "Published-Apis.json")
if not os.path.exists(PublishedApis):
raise FileNotFoundError(f"File {PublishedApis} not found")
with open(PublishedApis, 'r') as file:
PublishedApis = json.load(file)
if second_netapp_api:
service_api_id = PublishedApis.get('Test-2')
else:
service_api_id = PublishedApis.get('Test-3')
with open(capif_sdk_config_path, 'r') as file:
config = json.load(file)
config['publish_req']['service_api_id'] = service_api_id
with open(capif_sdk_config_path, 'w') as file:
json.dump(config, file, indent=4) # Save the formatted JSON
capif_provider_connector = CAPIFProviderConnector(config_file=capif_sdk_config_path)
capif_provider_connector.update_service()
print("PROVIDER UPDATE SERVICE COMPLETED")
capif_provider_connector.get_all_services()
print("PROVIDER GET ALL SERVICES COMPLETED")
capif_provider_connector.get_service()
print("PROVIDER GET SERVICE COMPLETED")
# Initialization of the connector
capif_provider_connector = CAPIFProviderConnector(config_file=capif_sdk_config_path)
capif_provider_connector.onboard_provider()
print("PROVIDER ONBOARDING COMPLETED")
# Get AEFs ids and APFs ids to publish an API
with open(capif_sdk_config_path, 'r') as file:
config = json.load(file)
provider_folder = config.get('provider_folder')
username_folder = config.get('capif_username')
if not provider_folder:
raise ValueError("'provider_folder' value is not defined in the configuration file.")
detailspath = os.path.join(provider_folder, username_folder, "capif_provider_details.json")
if not os.path.exists(detailspath):
raise FileNotFoundError(f"File {detailspath} not found")
with open(detailspath, 'r') as file:
details = json.load(file)
APF = details.get('APF-1_api_prov_func_id')
AEF1 = details.get('AEF-1_api_prov_func_id')
AEF2 = details.get('AEF-2_api_prov_func_id')
if not APF or not AEF1 or not AEF2:
raise ValueError("Not all necessary values were found in 'Capif_provider_details.json'")
config['publish_req']['publisher_apf_id'] = APF
config['publish_req']['publisher_aefs_ids'] = [AEF1, AEF2]
with open(capif_sdk_config_path, 'w') as file:
json.dump(config, file, indent=4) # Save the formatted JSON
print("Configuration file updated successfully.")
# Update the constructor with new configuration parameters
capif_provider_connector = CAPIFProviderConnector(config_file=capif_sdk_config_path)
capif_provider_connector.publish_services()
print("PROVIDER PUBLISH COMPLETED")
PublishedApis = os.path.join(provider_folder, username_folder, "Published-Apis.json")
if not os.path.exists(PublishedApis):
raise FileNotFoundError(f"File {PublishedApis} not found")
with open(PublishedApis, 'r') as file:
PublishedApis = json.load(file)
service_api_id = PublishedApis.get('Test')
with open(capif_sdk_config_path, 'r') as file:
config = json.load(file)
config['publish_req']['service_api_id'] = service_api_id
with open(capif_sdk_config_path, 'w') as file:
json.dump(config, file, indent=4) # Save the formatted JSON
capif_provider_connector = CAPIFProviderConnector(config_file=capif_sdk_config_path)
capif_provider_connector.update_service()
print("PROVIDER UPDATE COMPLETED")
capif_provider_connector.get_all_services()
print("PROVIDER GET ALL SERVICES COMPLETED")
capif_provider_connector.get_service()
print("PROVIDER GET SERVICE COMPLETED")
capif_invoker_connector = CAPIFInvokerConnector(config_file=capif_sdk_config_path)
capif_invoker_connector.onboard_invoker()
print("INVOKER ONBOARDING COMPLETED")
discoverer = ServiceDiscoverer(config_file=capif_sdk_config_path)
discoverer.discover()
print("SERVICE DISCOVER COMPLETED")
discoverer.get_tokens()
capif_invoker_connector.update_invoker()
print("INVOKER UPDATE SERVICE COMPLETED")
capif_invoker_connector.offboard_invoker()
capif_provider_connector.unpublish_service()
print("PROVIDER UNPUBLISH SERVICE COMPLETED")
capif_provider_connector = preparation_for_update("2", "4", True)
Chosen_apf = "APF-2_api_prov_func_id"
Chosen_aefs = ["AEF-1_api_prov_func_id", "AEF-3_api_prov_func_id", "AEF-4_api_prov_func_id"]
ensure_update(Chosen_apf, Chosen_aefs, True)
capif_provider_connector = preparation_for_update("1", "2", False)
Chosen_apf = "APF-1_api_prov_func_id"
Chosen_aefs = ["AEF-1_api_prov_func_id", "AEF-2_api_prov_func_id"]
ensure_update(Chosen_apf, Chosen_aefs, False)
capif_provider_connector.offboard_provider()
print("ALL TESTS PASSED CORRECTLY")
except FileNotFoundError as e:
print(f"Error: {e}")
except json.JSONDecodeError as e:
print(f"Error reading the JSON file: {e}")
print(f"Unexpected error: {e}")