Skip to content
Snippets Groups Projects
test.py 7.67 KiB
Newer Older
import sys
import os
import json
JorgeEcheva26's avatar
JorgeEcheva26 committed
# flake8: noqa
# Add the SDK directory to PYTHONPATH using a relative path
script_dir = os.path.dirname(os.path.abspath(__file__))  # Current script directory
sdk_path = os.path.join(script_dir, '..', 'sdk')  # Go up two levels and point to 'sdk'
sys.path.insert(0, sdk_path)
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
from sdk import capif_invoker_connector, capif_provider_connector, service_discoverer
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
capif_sdk_config_path = "./capif_sdk_config_sample_test.json"
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
def preparation_for_update(APFs, AEFs, second_network_app_api,capif_provider_connector):
JorgeEcheva26's avatar
JorgeEcheva26 committed
    
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
    capif_provider_connector.apfs = APFs
    capif_provider_connector.aefs = AEFs
    if second_network_app_api:
        capif_provider_connector.api_description_path = "./network_app_provider_api_spec_2.json"
    else:
        capif_provider_connector.api_description_path = "./network_app_provider_api_spec_3.json"         
JorgeEcheva26's avatar
JorgeEcheva26 committed
    
    return capif_provider_connector

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
def ensure_update(Chosen_apf, Chosen_aefs, second_network_app_api,capif_provider_connector):  
JorgeEcheva26's avatar
JorgeEcheva26 committed
    
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
    if second_network_app_api:
        # Get AEFs ids and APFs ids to publish an API
JorgeEcheva26's avatar
JorgeEcheva26 committed

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        provider_folder = capif_provider_connector.provider_folder
        
JorgeEcheva26's avatar
JorgeEcheva26 committed
        if not provider_folder:
            raise ValueError("'provider_folder' value is not defined in the configuration file.")
JorgeEcheva26's avatar
JorgeEcheva26 committed

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        detailspath = os.path.join(provider_folder, "capif_provider_details.json")
JorgeEcheva26's avatar
JorgeEcheva26 committed
        if not os.path.exists(detailspath):
            raise FileNotFoundError(f"File {detailspath} not found")
JorgeEcheva26's avatar
JorgeEcheva26 committed

        with open(detailspath, 'r') as file:
            details = json.load(file)
        
        APF = details.get(Chosen_apf)
        AEF1 = details.get(Chosen_aefs[0])
        AEF2 = details.get(Chosen_aefs[1])
        AEF3 = details.get(Chosen_aefs[2])
        
        if not APF or not AEF1 or not AEF2:
            raise ValueError("Not all necessary values were found in 'capif_provider_details.json'")
JorgeEcheva26's avatar
JorgeEcheva26 committed

        # Update configuration file
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        capif_provider_connector.publish_req['publisher_apf_id'] = APF
        capif_provider_connector.publish_req['publisher_aefs_ids'] = [AEF1, AEF2,AEF3]
JorgeEcheva26's avatar
JorgeEcheva26 committed


    else:

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        provider_folder = capif_provider_connector.provider_folder
        
JorgeEcheva26's avatar
JorgeEcheva26 committed
        if not provider_folder:
            raise ValueError("'provider_folder' value is not defined in the configuration file.")
JorgeEcheva26's avatar
JorgeEcheva26 committed

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        detailspath = os.path.join(provider_folder, "capif_provider_details.json")
JorgeEcheva26's avatar
JorgeEcheva26 committed
        if not os.path.exists(detailspath):
            raise FileNotFoundError(f"File {detailspath} not found")
JorgeEcheva26's avatar
JorgeEcheva26 committed

        with open(detailspath, 'r') as file:
            details = json.load(file)

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        APF = details.get('APF-1')
        AEF1 = details.get('AEF-1')
        AEF2 = details.get('AEF-2')
JorgeEcheva26's avatar
JorgeEcheva26 committed

        if not APF or not AEF1 or not AEF2:
            raise ValueError("Not all necessary values were found in 'capif_provider_details.json'")
JorgeEcheva26's avatar
JorgeEcheva26 committed

        # Update configuration file
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        capif_provider_connector.publish_req['publisher_apf_id'] = APF
        capif_provider_connector.publish_req['publisher_aefs_ids'] = [AEF1, AEF2]
JorgeEcheva26's avatar
JorgeEcheva26 committed

            
    capif_provider_connector.publish_services()

    print("PROVIDER PUBLISH COMPLETED")

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
    PublishedApis = os.path.join(provider_folder,"published-Apis.json")
JorgeEcheva26's avatar
JorgeEcheva26 committed
    if not os.path.exists(PublishedApis):
        raise FileNotFoundError(f"File {PublishedApis} not found")
JorgeEcheva26's avatar
JorgeEcheva26 committed

    with open(PublishedApis, 'r') as file:
        PublishedApis = json.load(file)
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
    if second_network_app_api:
        service_api_id = PublishedApis.get('Test-two')
JorgeEcheva26's avatar
JorgeEcheva26 committed
    else:
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        service_api_id = PublishedApis.get('Test-three')
JorgeEcheva26's avatar
JorgeEcheva26 committed

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
    capif_provider_connector.publish_req['service_api_id'] = service_api_id
JorgeEcheva26's avatar
JorgeEcheva26 committed

    capif_provider_connector.update_service()

    print("PROVIDER UPDATE SERVICE COMPLETED")

    capif_provider_connector.get_all_services()

    print("PROVIDER GET ALL SERVICES COMPLETED")

    capif_provider_connector.get_service()

    print("PROVIDER GET SERVICE COMPLETED")
JorgeEcheva26's avatar
JorgeEcheva26 committed
    
    capif_provider_connector.unpublish_service()
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
    
    return capif_provider_connector
JorgeEcheva26's avatar
JorgeEcheva26 committed


if __name__ == "__main__":
    try:
        # Initialization of the connector
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        capif_provider_connector = capif_provider_connector(config_file=capif_sdk_config_path)
        capif_provider_connector.onboard_provider()
        print("PROVIDER ONBOARDING COMPLETED")

        # Get AEFs ids and APFs ids to publish an API
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        provider_folder = capif_provider_connector.provider_folder
        
        detailspath = os.path.join(provider_folder, "capif_provider_details.json")
        if not os.path.exists(detailspath):
            raise FileNotFoundError(f"File {detailspath} not found")

        with open(detailspath, 'r') as file:
            details = json.load(file)

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        APF = details.get('APF-1')
        AEF1 = details.get('AEF-1')
        AEF2 = details.get('AEF-2')

        if not APF or not AEF1 or not AEF2:
            raise ValueError("Not all necessary values were found in 'capif_provider_details.json'")
        # Update configuration file
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        capif_provider_connector.publish_req['publisher_apf_id'] = APF
        capif_provider_connector.publish_req['publisher_aefs_ids'] = [AEF1, AEF2]

        capif_provider_connector.publish_services()

        print("PROVIDER PUBLISH COMPLETED")
JorgeEcheva26's avatar
JorgeEcheva26 committed

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        PublishedApis = os.path.join(provider_folder,"published-Apis.json")
        
        with open(PublishedApis, 'r') as file:
            PublishedApis = json.load(file)
        service_api_id = PublishedApis.get('Test')

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        capif_provider_connector.publish_req['service_api_id'] = service_api_id

        capif_provider_connector.update_service()

        print("PROVIDER UPDATE COMPLETED")

        capif_provider_connector.get_all_services()
JorgeEcheva26's avatar
JorgeEcheva26 committed

        print("PROVIDER GET ALL SERVICES COMPLETED")

        capif_provider_connector.get_service()
JorgeEcheva26's avatar
JorgeEcheva26 committed

        print("PROVIDER GET SERVICE COMPLETED")

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        capif_invoker_connector = capif_invoker_connector(config_file=capif_sdk_config_path)

        capif_invoker_connector.onboard_invoker()
        print("INVOKER ONBOARDING COMPLETED")

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        discoverer = service_discoverer(config_file=capif_sdk_config_path)

        discoverer.discover()
JorgeEcheva26's avatar
JorgeEcheva26 committed

        print("SERVICE DISCOVER COMPLETED")

        discoverer.get_tokens()
JorgeEcheva26's avatar
JorgeEcheva26 committed

        print("SERVICE GET TOKENS COMPLETED")
JorgeEcheva26's avatar
JorgeEcheva26 committed
        
        capif_invoker_connector.update_invoker()
        
        print("INVOKER UPDATE SERVICE COMPLETED")
JorgeEcheva26's avatar
JorgeEcheva26 committed

        capif_invoker_connector.offboard_invoker()
JorgeEcheva26's avatar
JorgeEcheva26 committed

        print("INVOKER OFFBOARD COMPLETED")
JorgeEcheva26's avatar
JorgeEcheva26 committed

        capif_provider_connector.unpublish_service()
JorgeEcheva26's avatar
JorgeEcheva26 committed

        print("PROVIDER UNPUBLISH SERVICE COMPLETED")
JorgeEcheva26's avatar
JorgeEcheva26 committed

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        capif_provider_connector = preparation_for_update(2, 4, True,capif_provider_connector)
JorgeEcheva26's avatar
JorgeEcheva26 committed
        
        capif_provider_connector.update_provider()
        
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        Chosen_apf = "APF-2"
JorgeEcheva26's avatar
JorgeEcheva26 committed
        
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        Chosen_aefs = ["AEF-1", "AEF-3", "AEF-4"]
JorgeEcheva26's avatar
JorgeEcheva26 committed
        
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        capif_provider_connector = ensure_update(Chosen_apf, Chosen_aefs, True,capif_provider_connector)
JorgeEcheva26's avatar
JorgeEcheva26 committed
        
        print("PROVIDER UPDATE ONE COMPLETED")
        
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        capif_provider_connector = preparation_for_update(1, 2, False,capif_provider_connector)
JorgeEcheva26's avatar
JorgeEcheva26 committed
        
        capif_provider_connector.update_provider()
        
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        Chosen_apf = "APF-1"
JorgeEcheva26's avatar
JorgeEcheva26 committed
        
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        Chosen_aefs = ["AEF-1", "AEF-2"]
JorgeEcheva26's avatar
JorgeEcheva26 committed
        
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        capif_provider_connector = ensure_update(Chosen_apf, Chosen_aefs, False,capif_provider_connector)
JorgeEcheva26's avatar
JorgeEcheva26 committed
        
        print("PROVIDER UPDATE TWO COMPLETED")
JorgeEcheva26's avatar
JorgeEcheva26 committed

        capif_provider_connector.offboard_provider()
JorgeEcheva26's avatar
JorgeEcheva26 committed

        print("PROVIDER OFFBOARDING COMPLETED")
JorgeEcheva26's avatar
JorgeEcheva26 committed
        
        print("ALL TESTS PASSED CORRECTLY")

    except FileNotFoundError as e:
        print(f"Error: {e}")
    except json.JSONDecodeError as e:
        print(f"Error reading the JSON file: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")