import json # flake8: noqa from opencapif_sdk import capif_invoker_connector, capif_provider_connector, service_discoverer, capif_logging_feature, capif_invoker_event_feature, capif_provider_event_feature capif_sdk_config_path = "./capif_sdk_config_release_2.json" def preparation_for_update(APFs, AEFs, second_network_app_api,capif_provider_connector): capif_provider_connector.apfs = APFs capif_provider_connector.aefs = AEFs if second_network_app_api: capif_provider_connector.api_description_path = "./network_app_provider_api_spec.json" else: capif_provider_connector.api_description_path = "./network_app_provider_api_spec_3.json" return capif_provider_connector def ensure_update(chosen_apf, chosen_aefs, second_network_app_api,capif_provider_connector): if second_network_app_api: # Get AEFs ids and APFs ids to publish an APi APF = capif_provider_connector.provider_capif_ids[chosen_apf] AEF1 = capif_provider_connector.provider_capif_ids[chosen_aefs[0]] AEF2 = capif_provider_connector.provider_capif_ids[chosen_aefs[1]] if not APF or not AEF1 or not AEF2: raise ValueError("Not all necessary values were found in 'provider_service_ids.json'") # Update configuration file capif_provider_connector.publish_req['publisher_apf_id'] = APF capif_provider_connector.publish_req['publisher_aefs_ids'] = [AEF1, AEF2] else: APF = capif_provider_connector.provider_capif_ids['APF-1'] AEF1 = capif_provider_connector.provider_capif_ids['AEF-1'] AEF2 = capif_provider_connector.provider_capif_ids['AEF-2'] capif_provider_connector.publish_req['publisher_apf_id'] = APF capif_provider_connector.publish_req['publisher_aefs_ids'] = [AEF1, AEF2] capif_provider_connector.publish_services() if second_network_app_api: service_api_id = capif_provider_connector.provider_service_ids['Testtrece'] else: service_api_id = capif_provider_connector.provider_service_ids['Test-three'] capif_provider_connector.publish_req['service_api_id'] = service_api_id capif_provider_connector.update_service() print("PROVIDER UPDATE SERVICE COMPLETED") capif_provider_connector.get_all_services() print("PROVIDER GET ALL SERVICES COMPLETED") capif_provider_connector.get_service() print("PROVIDER GET SERVICE COMPLETED") capif_provider_connector.unpublish_service() return capif_provider_connector if __name__ == "__main__": try: # Initialization of the connector capif_provider_connector = capif_provider_connector(config_file=capif_sdk_config_path) capif_provider_connector.onboard_provider() print("PROVIDER ONBOARDING COMPLETED") # Get AEFs ids and APFs ids to publish an API APF1 = capif_provider_connector.provider_capif_ids['APF-1'] APF2 = capif_provider_connector.provider_capif_ids['APF-2'] AEF1 = capif_provider_connector.provider_capif_ids['AEF-1'] AEF2 = capif_provider_connector.provider_capif_ids['AEF-2'] AEF3 = capif_provider_connector.provider_capif_ids['AEF-3'] capif_provider_connector.api_description_path="test1.json" # Update configuration file capif_provider_connector.publish_req['publisher_apf_id'] = APF1 capif_provider_connector.publish_req['publisher_aefs_ids'] = [AEF1] event_provider = capif_provider_event_feature(config_file=capif_sdk_config_path) event_provider.create_subscription(name="Ejemplo1",id=AEF2) event_provider.create_subscription(name="Ejemplo2",id=APF1) event_provider.delete_subscription(name="Ejemplo1",id=AEF2) event_provider.delete_subscription(name="Ejemplo2",id=APF1) capif_provider_connector.publish_services() print("PROVIDER PUBLISH COMPLETED") service_api_id = capif_provider_connector.provider_service_ids["API of dummy Network-App to test"] capif_provider_connector.publish_req['service_api_id'] = service_api_id capif_provider_connector.update_service() print("PROVIDER UPDATE COMPLETED") capif_provider_connector.get_all_services() print("PROVIDER GET ALL SERVICES COMPLETED") capif_provider_connector.get_service() print("PROVIDER GET SERVICE COMPLETED") capif_invoker_connector = capif_invoker_connector(config_file=capif_sdk_config_path) capif_invoker_connector.onboard_invoker() print("INVOKER ONBOARDING COMPLETED") discoverer = service_discoverer(config_file=capif_sdk_config_path) discoverer.discover() print("SERVICE DISCOVER COMPLETED") discoverer.get_tokens() print("SERVICE GET TOKENS COMPLETED") logger=capif_logging_feature(config_file=capif_sdk_config_path) logger.create_logs(aefId=AEF1, jwt=discoverer.token) event_invoker = capif_invoker_event_feature(config_file=capif_sdk_config_path) event_invoker.create_subscription(name="Ejemplo3") event_invoker.create_subscription(name="Ejemplo4") event_invoker.delete_subscription(name="Ejemplo3") event_invoker.delete_subscription(name="Ejemplo4") capif_invoker_connector.update_invoker() print("INVOKER UPDATE SERVICE COMPLETED") capif_invoker_connector.offboard_invoker() print("INVOKER OFFBOARD COMPLETED") capif_provider_connector.unpublish_service() print("PROVIDER UNPUBLISH SERVICE COMPLETED") capif_provider_connector = preparation_for_update(2, 4, True,capif_provider_connector) capif_provider_connector.update_provider() chosen_apf = "APF-2" chosen_aefs = ["AEF-1", "AEF-3", "AEF-4"] capif_provider_connector = ensure_update(chosen_apf, chosen_aefs, True,capif_provider_connector) print("PROVIDER UPDATE ONE COMPLETED") capif_provider_connector = preparation_for_update(1, 2, False,capif_provider_connector) capif_provider_connector.update_provider() chosen_apf = "APF-1" chosen_aefs = ["AEF-1", "AEF-2"] capif_provider_connector = ensure_update(chosen_apf, chosen_aefs, False,capif_provider_connector) print("PROVIDER UPDATE TWO COMPLETED") capif_provider_connector.offboard_provider() print("PROVIDER OFFBOARDING COMPLETED") print("ALL TESTS PASSED CORRECTLY") except FileNotFoundError as e: print(f"Error: {e}") except json.JSONDecodeError as e: print(f"Error reading the JSON file: {e}") except Exception as e: print(f"Unexpected error: {e}")