Skip to content
Snippets Groups Projects
test.py 9.99 KiB
Newer Older
import sys
import os
import json
JorgeEcheva26's avatar
JorgeEcheva26 committed
# flake8: noqa
# Añadir el directorio del SDK al PYTHONPATH usando una ruta relativa
script_dir = os.path.dirname(os.path.abspath(__file__))  # Directorio actual del script
sdk_path = os.path.join(script_dir, '..', 'sdk')  # Subir dos niveles y apuntar a 'sdk'
sys.path.insert(0, sdk_path)
from sdk import CAPIFProviderConnector, CAPIFInvokerConnector, ServiceDiscoverer


capif_sdk_config_path = "./capif-sdk-config-sample-test.json"

JorgeEcheva26's avatar
JorgeEcheva26 committed
def preparation_for_update(APFs,AEFs,second_netapp_api):
    with open(capif_sdk_config_path, 'r') as file:
            config = json.load(file)
    config['apfs']=APFs
    config['aefs']=AEFs
    if second_netapp_api:
        config['api_description_path']="./netapp-provider-api-spec-2.json"
    else:
        config['api_description_path']="./netapp-provider-api-spec-3.json"        
    with open(capif_sdk_config_path, 'w') as file:
            json.dump(config, file, indent=4)  
    
    capif_provider_connector = CAPIFProviderConnector(config_file=capif_sdk_config_path)
    
    return capif_provider_connector

def ensure_update(Chosen_apf,Chosen_aefs,second_netapp_api):  
    
    if second_netapp_api:
        # Obtención de los AEFs ids y APFs ids para publicar una API
        with open(capif_sdk_config_path, 'r') as file:
            config = json.load(file)

        provider_folder = config.get('provider_folder')
        username_folder = config.get('capif_username')
        if not provider_folder:
            raise ValueError("El valor 'provider_folder' no está definido en el archivo de configuración.")

        detailspath = os.path.join(provider_folder, username_folder, "capif_provider_details.json")
        if not os.path.exists(detailspath):
            raise FileNotFoundError(f"No se encontró el archivo {detailspath}")

        with open(detailspath, 'r') as file:
            details = json.load(file)
        
        APF = details.get(Chosen_apf)
        AEF1 = details.get(Chosen_aefs[0])
        AEF2 = details.get(Chosen_aefs[1])
        AEF3 = details.get(Chosen_aefs[2])
        
        if not APF or not AEF1 or not AEF2:
            raise ValueError("No se encontraron todos los valores necesarios en 'Capif_provider_details.json'")

        # Actualización del archivo de configuración
        config['publish_req']['publisher_apf_id'] = APF
        config['publish_req']['publisher_aefs_ids'] = [AEF1,AEF2,AEF3]

        with open(capif_sdk_config_path, 'w') as file:
            json.dump(config, file, indent=4)  # Guarda el JSON con formato

        print("Archivo de configuración actualizado correctamente.")
    else:
        with open(capif_sdk_config_path, 'r') as file:
            config = json.load(file)

        provider_folder = config.get('provider_folder')
        username_folder = config.get('capif_username')
        if not provider_folder:
            raise ValueError("El valor 'provider_folder' no está definido en el archivo de configuración.")

        detailspath = os.path.join(provider_folder, username_folder, "capif_provider_details.json")
        if not os.path.exists(detailspath):
            raise FileNotFoundError(f"No se encontró el archivo {detailspath}")

        with open(detailspath, 'r') as file:
            details = json.load(file)

        APF = details.get('APF-1_api_prov_func_id')
        AEF1 = details.get('AEF-1_api_prov_func_id')
        AEF2 = details.get('AEF-2_api_prov_func_id')

        if not APF or not AEF1 or not AEF2:
            raise ValueError("No se encontraron todos los valores necesarios en 'Capif_provider_details.json'")

        # Actualización del archivo de configuración
        config['publish_req']['publisher_apf_id'] = APF
        config['publish_req']['publisher_aefs_ids'] = [AEF1, AEF2]

        with open(capif_sdk_config_path, 'w') as file:
            json.dump(config, file, indent=4)  # Guarda el JSON con formato

        print("Archivo de configuración actualizado correctamente.")
            
    capif_provider_connector = CAPIFProviderConnector(config_file=capif_sdk_config_path)

    capif_provider_connector.publish_services()

    print("PROVIDER PUBLISH COMPLETED")

    PublishedApis = os.path.join(provider_folder, username_folder, "Published-Apis.json")
    if not os.path.exists(PublishedApis):
        raise FileNotFoundError(f"No se encontró el archivo {PublishedApis}")

    with open(PublishedApis, 'r') as file:
        PublishedApis = json.load(file)
    if second_netapp_api:
        service_api_id = PublishedApis.get('Test-2')
    else:
        service_api_id = PublishedApis.get('Test-3')

    with open(capif_sdk_config_path, 'r') as file:
        config = json.load(file)

    config['publish_req']['service_api_id'] = service_api_id

    with open(capif_sdk_config_path, 'w') as file:
        json.dump(config, file, indent=4)  # Guarda el JSON con formato

    capif_provider_connector = CAPIFProviderConnector(config_file=capif_sdk_config_path)

    capif_provider_connector.update_service()

    print("PROVIDER UPDATE SERVICE COMPLETED")

    capif_provider_connector.get_all_services()

    print("PROVIDER GET ALL SERVICES COMPLETED")

    capif_provider_connector.get_service()

    print("PROVIDER GET SERVICE COMPLETED")


if __name__ == "__main__":
    try:
        # Inicialización del conector
        capif_provider_connector = CAPIFProviderConnector(config_file=capif_sdk_config_path)
        capif_provider_connector.onboard_provider()
        print("PROVIDER ONBOARDING COMPLETED")

        # Obtención de los AEFs ids y APFs ids para publicar una API
        with open(capif_sdk_config_path, 'r') as file:
            config = json.load(file)

        provider_folder = config.get('provider_folder')
        username_folder = config.get('capif_username')
        if not provider_folder:
            raise ValueError("El valor 'provider_folder' no está definido en el archivo de configuración.")

        detailspath = os.path.join(provider_folder, username_folder, "capif_provider_details.json")
        if not os.path.exists(detailspath):
            raise FileNotFoundError(f"No se encontró el archivo {detailspath}")

        with open(detailspath, 'r') as file:
            details = json.load(file)

        APF = details.get('APF-1_api_prov_func_id')
        AEF1 = details.get('AEF-1_api_prov_func_id')
        AEF2 = details.get('AEF-2_api_prov_func_id')

        if not APF or not AEF1 or not AEF2:
            raise ValueError("No se encontraron todos los valores necesarios en 'Capif_provider_details.json'")

        # Actualización del archivo de configuración
        config['publish_req']['publisher_apf_id'] = APF
        config['publish_req']['publisher_aefs_ids'] = [AEF1, AEF2]

        with open(capif_sdk_config_path, 'w') as file:
            json.dump(config, file, indent=4)  # Guarda el JSON con formato

        print("Archivo de configuración actualizado correctamente.")
        # Volver a actualizar el constructor con los parametros nuevos de configuración
        capif_provider_connector = CAPIFProviderConnector(config_file=capif_sdk_config_path)

        capif_provider_connector.publish_services()

        print("PROVIDER PUBLISH COMPLETED")
JorgeEcheva26's avatar
JorgeEcheva26 committed

        PublishedApis = os.path.join(provider_folder, username_folder, "Published-Apis.json")
        if not os.path.exists(PublishedApis):
            raise FileNotFoundError(f"No se encontró el archivo {PublishedApis}")

        with open(PublishedApis, 'r') as file:
            PublishedApis = json.load(file)
        service_api_id = PublishedApis.get('Test')

        with open(capif_sdk_config_path, 'r') as file:
            config = json.load(file)

        config['publish_req']['service_api_id'] = service_api_id

        with open(capif_sdk_config_path, 'w') as file:
            json.dump(config, file, indent=4)  # Guarda el JSON con formato

        capif_provider_connector = CAPIFProviderConnector(config_file=capif_sdk_config_path)

        capif_provider_connector.update_service()

        print("PROVIDER UPDATE COMPLETED")

        capif_provider_connector.get_all_services()
JorgeEcheva26's avatar
JorgeEcheva26 committed

        print("PROVIDER GET ALL SERVICES COMPLETED")

        capif_provider_connector.get_service()
JorgeEcheva26's avatar
JorgeEcheva26 committed

        print("PROVIDER GET SERVICE COMPLETED")

        capif_invoker_connector = CAPIFInvokerConnector(config_file=capif_sdk_config_path)

        capif_invoker_connector.onboard_invoker()
        print("INVOKER ONBOARDING COMPLETED")

        discoverer = ServiceDiscoverer(config_file=capif_sdk_config_path)

        discoverer.discover()
JorgeEcheva26's avatar
JorgeEcheva26 committed

        print("SERVICE DISCOVER COMPLETED")

        discoverer.get_tokens()
JorgeEcheva26's avatar
JorgeEcheva26 committed

        print("SERVICE GET TOKENS COMPLETED")
JorgeEcheva26's avatar
JorgeEcheva26 committed
        
        capif_invoker_connector.update_invoker()
        
        print("INVOKER UPDATE SERVICE COMPLETED")
JorgeEcheva26's avatar
JorgeEcheva26 committed

        capif_invoker_connector.offboard_invoker()
JorgeEcheva26's avatar
JorgeEcheva26 committed

        print("INVOKER OFFBOARD COMPLETED")
JorgeEcheva26's avatar
JorgeEcheva26 committed

        capif_provider_connector.unpublish_service()
JorgeEcheva26's avatar
JorgeEcheva26 committed

        print("PROVIDER UNPUBLISH SERVICE COMPLETED")
JorgeEcheva26's avatar
JorgeEcheva26 committed

JorgeEcheva26's avatar
JorgeEcheva26 committed
        capif_provider_connector=preparation_for_update("2","4",True)
        
        capif_provider_connector.update_provider()
        
        Chosen_apf= "APF-2_api_prov_func_id"
        
        Chosen_aefs= ["AEF-1_api_prov_func_id","AEF-3_api_prov_func_id","AEF-4_api_prov_func_id"]
        
        ensure_update(Chosen_apf,Chosen_aefs,True)
        
        print("PROVIDER UPDATE ONE COMPLETED")
        
        capif_provider_connector=preparation_for_update("1","2",False)
        
        capif_provider_connector.update_provider()
        
        Chosen_apf= "APF-1_api_prov_func_id"
        
        Chosen_aefs= ["AEF-1_api_prov_func_id","AEF-2_api_prov_func_id"]
        
        ensure_update(Chosen_apf,Chosen_aefs,False)
        
        print("PROVIDER UPDATE TWO COMPLETED")
JorgeEcheva26's avatar
JorgeEcheva26 committed

        capif_provider_connector.offboard_provider()
JorgeEcheva26's avatar
JorgeEcheva26 committed

        print("PROVIDER OFFBOARDING COMPLETED")
JorgeEcheva26's avatar
JorgeEcheva26 committed
        
        print("ALL TEST PASSED CORRECTLY")

    except FileNotFoundError as e:
        print(f"Error: {e}")
    except json.JSONDecodeError as e:
        print(f"Error al leer el archivo JSON: {e}")
    except Exception as e:
        print(f"Error inesperado: {e}")
JorgeEcheva26's avatar
JorgeEcheva26 committed