Loading tests/features/CAPIF Security Api/capif_security_api.robot +65 −0 Original line number Diff line number Diff line Loading @@ -1252,3 +1252,68 @@ Retrieve access token with invalid apiName at scope Check Response Variable Type And Values ${resp} 400 AccessTokenErr ... error=invalid_scope ... error_description=One of the api names does not exist or is not associated with the aef id provided Retrieve the Security Context of an API Invoker for PKI security method [Tags] capif_security_api-28 smoke # Default Invoker Registration and Onboarding ${register_user_info_invoker} ${url} ${request_body}= Invoker Default Onboarding # Register Provider ${register_user_info_provider}= Provider Default Registration # Publish Service API # Create list with security methods ${security_methods}= Create List PKI ${service_api_description_published_1} ${resource_url} ${request_body}= Publish Service Api ... ${register_user_info_provider} ... service_1 ... security_methods=${security_methods} # Store apiId1 ${service_api_id_1}= Set Variable ${service_api_description_published_1['apiId']} # Create Security Context ${request_body}= Create Service Security Default Body ... ${NOTIFICATION_DESTINATION_URL} ... aef_id=${register_user_info_provider['aef_id']} ... api_id=${service_api_id_1} ${resp}= Put Request Capif ... /capif-security/v1/trustedInvokers/${register_user_info_invoker['api_invoker_id']} ... json=${request_body} ... server=${CAPIF_HTTPS_URL} ... verify=ca.crt ... username=${INVOKER_USERNAME} Check Response Variable Type And Values ${resp} 201 ServiceSecurity ${resource_url}= Check Location Header ${resp} ${LOCATION_SECURITY_RESOURCE_REGEX} ${service_security_context}= Set Variable ${resp.json()} # Retrieve Security context can setup by parameters if authenticationInfo and authorizationInfo are needed at response. ${resp}= Get Request Capif ... /capif-security/v1/trustedInvokers/${register_user_info_invoker['api_invoker_id']}?authenticationInfo=true&authorizationInfo=true ... server=${CAPIF_HTTPS_URL} ... verify=ca.crt ... username=${AEF_PROVIDER_USERNAME} # Check Results Check Response Variable Type And Values ${resp} 200 ServiceSecurity # Response must accomplish: # aefProfile must contain authenticationInfo with CA root certificate # aefProfile must NOT CONTAIN authorizationInfo # Create security_info to compare with response ## Read CA root certificate ${ca_root}= Read File Utf8 ca.crt ## Create a securityInfo with authenticationInfo with CA root certificate ${security_info_expected}= Add Key To Object ${service_security_context['securityInfo'][0]} authenticationInfo ${ca_root} ## Create List of securityInfo ${security_info_list}= Create List ${security_info_expected} ## Set expected securityInfo list in service_security_context ${service_security_context_filtered}= Add Key To Object ${service_security_context} securityInfo ${security_info_list} Log Dictionary ${service_security_context_filtered} # Check Results Dictionaries Should Be Equal ${resp.json()} ${service_security_context_filtered} tests/libraries/api_publish_service/bodyRequests.py +69 −33 Original line number Diff line number Diff line Loading @@ -4,7 +4,7 @@ def create_service_api_description(api_name="service_1", vendor_specific_service_api_description=None, vendor_specific_aef_profile=None, api_status=None, security_method="default", security_methods="default", domain_name=None, interface_descriptions=None): aef_ids = list() Loading @@ -15,30 +15,31 @@ def create_service_api_description(api_name="service_1", print("aef_id parameter is a string") aef_ids.append(aef_id) security_methods = list() if security_method is not None: if isinstance(security_method, list): print("security_method parameter is a list") if len(security_method) > 0: if isinstance(security_method[0], list): security_methods = security_method security_methods_normalized = list() if security_methods is not None: if isinstance(security_methods, list): print("security_methods parameter is a list") if len(security_methods) > 0: if isinstance(security_methods[0], list): security_methods_normalized = security_methods else: security_methods.append(security_method) elif isinstance(security_method, str): security_methods_normalized.append(security_methods) elif isinstance(security_methods, str): print("security_methods parameter is a string") if security_method == "default": if security_methods == "default": for idx in range(len(aef_ids)): security_methods.append(["OAUTH"]) security_methods.append([security_method]) security_methods_normalized.append(["OAUTH"]) else: print(f"security_method is {security_method}") print(f"security_methods: {security_methods}") security_methods_normalized.append([security_methods]) else: print("security_method parameter is None") print(f"security_methods is {security_methods}") print(f"security_methods_normalized: {security_methods_normalized}") else: print("security_methods parameter is None") profiles = create_aef_profiles( aef_ids, security_methods, security_methods_normalized, domain_name, interface_descriptions) Loading Loading @@ -142,11 +143,12 @@ def create_aef_profile(aef_id, data['interfaceDescriptions'] = interface_descriptions elif domain_name is None and interface_descriptions is None: data['interfaceDescriptions'] = [ { "ipv4Addr": "string", "port": 65535, "securityMethods": security_method } create_interface_description( ipv4_addr="string", port=65535, security_methods=security_method, grant_types=["CLIENT_CREDENTIALS"] ) ] if security_method is not None: Loading @@ -162,7 +164,7 @@ def create_service_api_description_patch(aef_id=None, api_supp_feats=None, pub_api_path=None, ccf_id=None, security_method=None, security_methods=None, domain_name=None, interface_descriptions=None): body = dict() Loading @@ -178,23 +180,23 @@ def create_service_api_description_patch(aef_id=None, print("aef_id parameter is a string") aef_ids.append(aef_id) security_methods = list() if security_method is not None: if isinstance(security_method, list): print("security_method parameter is a list") if len(security_method) > 0: if isinstance(security_method[0], list): security_methods = security_method security_methods_normalized = list() if security_methods is not None: if isinstance(security_methods, list): print("security_methods parameter is a list") if len(security_methods) > 0: if isinstance(security_methods[0], list): security_methods_normalized = security_methods else: security_methods.append(security_method) security_methods_normalized.append(security_methods) elif isinstance(security_methods, str): print("security_methods parameter is a string") security_methods.append([security_methods]) security_methods_normalized.append([security_methods]) if aef_ids is not None: profiles = create_aef_profiles( aef_ids, security_methods, security_methods_normalized, domain_name, interface_descriptions) body['aefProfiles'] = profiles Loading Loading @@ -240,3 +242,37 @@ def create_service_api_description_patch(aef_id=None, def get_value(lst, index): return lst[index] if index < len(lst) else None def create_interface_description(ipv4_addr=None, ipv6_addr=None, fqdn=None, port=None, api_prefix=None, security_methods=None, grant_types=None): """ Create an interface description with the given parameters. """ # Create the interface description dictionary data = dict() if ipv4_addr is not None: data['ipv4Addr'] = ipv4_addr elif ipv6_addr is not None: data['ipv6Addr'] = ipv6_addr elif fqdn is not None: data['fqdn'] = fqdn else: raise ValueError( "At least one of ipv4_addr, ipv6_addr, or fqdn must be provided.") if port is not None: data['port'] = port if api_prefix is not None: data['apiPrefix'] = api_prefix if security_methods is not None: data['securityMethods'] = security_methods if grant_types is not None: data['grantTypes'] = grant_types # Return the interface description return data tests/libraries/helpers.py +12 −0 Original line number Diff line number Diff line Loading @@ -50,6 +50,12 @@ def store_in_file(file_path, data): f.write(bytes(data, 'utf-8')) f.close() def read_file_utf8(file_path: str) -> str: try: with open(file_path, 'r', encoding='utf-8') as file: return file.read() except Exception as error: raise Exception(f"Error al leer el archivo: {error}") def cert_tuple(cert_file, key_file): return (cert_file, key_file) Loading Loading @@ -134,6 +140,12 @@ def remove_key_from_object(input, key_to_remove): return input_copy def add_key_to_object(input, key_to_add, value_to_add): input_copy = copy.deepcopy(input) input_copy[key_to_add] = value_to_add return input_copy def create_scope(aef_id, api_name): data = "3gpp#" + aef_id + ":" + api_name Loading tests/resources/common/basicRequests.robot +13 −0 Original line number Diff line number Diff line Loading @@ -796,6 +796,9 @@ Publish Service Api Request ... ${vendor_specific_aef_profile}=${None} ... ${aef_id}=${NONE} ... ${api_status}=${NONE} ... ${security_methods}=default ... ${domain_name}=${NONE} ... ${interface_descriptions}=${NONE} ${aef_ids}= Create List IF "${aef_id}" == "${NONE}" Loading Loading @@ -826,6 +829,10 @@ Publish Service Api Request ... ${vendor_specific_service_api_description} ... ${vendor_specific_aef_profile} ... ${api_status} ... ${security_methods} ... ${domain_name} ... ${interface_descriptions} ${resp}= Post Request Capif ... /published-apis/v1/${apf_id_to_use}/service-apis ... json=${request_body} Loading @@ -846,6 +853,9 @@ Publish Service Api ... ${vendor_specific_aef_profile}=${None} ... ${aef_id}=${NONE} ... ${api_status}=${NONE} ... ${security_methods}=default ... ${domain_name}=${NONE} ... ${interface_descriptions}=${NONE} ${resp} ${request_body}= Publish Service Api Request ... ${register_user_info_provider} Loading @@ -857,6 +867,9 @@ Publish Service Api ... ${vendor_specific_aef_profile} ... ${aef_id} ... ${api_status} ... ${security_methods} ... ${domain_name} ... ${interface_descriptions} Check Response Variable Type And Values ${resp} 201 ServiceAPIDescription Dictionary Should Contain Key ${resp.json()} apiId Loading Loading
tests/features/CAPIF Security Api/capif_security_api.robot +65 −0 Original line number Diff line number Diff line Loading @@ -1252,3 +1252,68 @@ Retrieve access token with invalid apiName at scope Check Response Variable Type And Values ${resp} 400 AccessTokenErr ... error=invalid_scope ... error_description=One of the api names does not exist or is not associated with the aef id provided Retrieve the Security Context of an API Invoker for PKI security method [Tags] capif_security_api-28 smoke # Default Invoker Registration and Onboarding ${register_user_info_invoker} ${url} ${request_body}= Invoker Default Onboarding # Register Provider ${register_user_info_provider}= Provider Default Registration # Publish Service API # Create list with security methods ${security_methods}= Create List PKI ${service_api_description_published_1} ${resource_url} ${request_body}= Publish Service Api ... ${register_user_info_provider} ... service_1 ... security_methods=${security_methods} # Store apiId1 ${service_api_id_1}= Set Variable ${service_api_description_published_1['apiId']} # Create Security Context ${request_body}= Create Service Security Default Body ... ${NOTIFICATION_DESTINATION_URL} ... aef_id=${register_user_info_provider['aef_id']} ... api_id=${service_api_id_1} ${resp}= Put Request Capif ... /capif-security/v1/trustedInvokers/${register_user_info_invoker['api_invoker_id']} ... json=${request_body} ... server=${CAPIF_HTTPS_URL} ... verify=ca.crt ... username=${INVOKER_USERNAME} Check Response Variable Type And Values ${resp} 201 ServiceSecurity ${resource_url}= Check Location Header ${resp} ${LOCATION_SECURITY_RESOURCE_REGEX} ${service_security_context}= Set Variable ${resp.json()} # Retrieve Security context can setup by parameters if authenticationInfo and authorizationInfo are needed at response. ${resp}= Get Request Capif ... /capif-security/v1/trustedInvokers/${register_user_info_invoker['api_invoker_id']}?authenticationInfo=true&authorizationInfo=true ... server=${CAPIF_HTTPS_URL} ... verify=ca.crt ... username=${AEF_PROVIDER_USERNAME} # Check Results Check Response Variable Type And Values ${resp} 200 ServiceSecurity # Response must accomplish: # aefProfile must contain authenticationInfo with CA root certificate # aefProfile must NOT CONTAIN authorizationInfo # Create security_info to compare with response ## Read CA root certificate ${ca_root}= Read File Utf8 ca.crt ## Create a securityInfo with authenticationInfo with CA root certificate ${security_info_expected}= Add Key To Object ${service_security_context['securityInfo'][0]} authenticationInfo ${ca_root} ## Create List of securityInfo ${security_info_list}= Create List ${security_info_expected} ## Set expected securityInfo list in service_security_context ${service_security_context_filtered}= Add Key To Object ${service_security_context} securityInfo ${security_info_list} Log Dictionary ${service_security_context_filtered} # Check Results Dictionaries Should Be Equal ${resp.json()} ${service_security_context_filtered}
tests/libraries/api_publish_service/bodyRequests.py +69 −33 Original line number Diff line number Diff line Loading @@ -4,7 +4,7 @@ def create_service_api_description(api_name="service_1", vendor_specific_service_api_description=None, vendor_specific_aef_profile=None, api_status=None, security_method="default", security_methods="default", domain_name=None, interface_descriptions=None): aef_ids = list() Loading @@ -15,30 +15,31 @@ def create_service_api_description(api_name="service_1", print("aef_id parameter is a string") aef_ids.append(aef_id) security_methods = list() if security_method is not None: if isinstance(security_method, list): print("security_method parameter is a list") if len(security_method) > 0: if isinstance(security_method[0], list): security_methods = security_method security_methods_normalized = list() if security_methods is not None: if isinstance(security_methods, list): print("security_methods parameter is a list") if len(security_methods) > 0: if isinstance(security_methods[0], list): security_methods_normalized = security_methods else: security_methods.append(security_method) elif isinstance(security_method, str): security_methods_normalized.append(security_methods) elif isinstance(security_methods, str): print("security_methods parameter is a string") if security_method == "default": if security_methods == "default": for idx in range(len(aef_ids)): security_methods.append(["OAUTH"]) security_methods.append([security_method]) security_methods_normalized.append(["OAUTH"]) else: print(f"security_method is {security_method}") print(f"security_methods: {security_methods}") security_methods_normalized.append([security_methods]) else: print("security_method parameter is None") print(f"security_methods is {security_methods}") print(f"security_methods_normalized: {security_methods_normalized}") else: print("security_methods parameter is None") profiles = create_aef_profiles( aef_ids, security_methods, security_methods_normalized, domain_name, interface_descriptions) Loading Loading @@ -142,11 +143,12 @@ def create_aef_profile(aef_id, data['interfaceDescriptions'] = interface_descriptions elif domain_name is None and interface_descriptions is None: data['interfaceDescriptions'] = [ { "ipv4Addr": "string", "port": 65535, "securityMethods": security_method } create_interface_description( ipv4_addr="string", port=65535, security_methods=security_method, grant_types=["CLIENT_CREDENTIALS"] ) ] if security_method is not None: Loading @@ -162,7 +164,7 @@ def create_service_api_description_patch(aef_id=None, api_supp_feats=None, pub_api_path=None, ccf_id=None, security_method=None, security_methods=None, domain_name=None, interface_descriptions=None): body = dict() Loading @@ -178,23 +180,23 @@ def create_service_api_description_patch(aef_id=None, print("aef_id parameter is a string") aef_ids.append(aef_id) security_methods = list() if security_method is not None: if isinstance(security_method, list): print("security_method parameter is a list") if len(security_method) > 0: if isinstance(security_method[0], list): security_methods = security_method security_methods_normalized = list() if security_methods is not None: if isinstance(security_methods, list): print("security_methods parameter is a list") if len(security_methods) > 0: if isinstance(security_methods[0], list): security_methods_normalized = security_methods else: security_methods.append(security_method) security_methods_normalized.append(security_methods) elif isinstance(security_methods, str): print("security_methods parameter is a string") security_methods.append([security_methods]) security_methods_normalized.append([security_methods]) if aef_ids is not None: profiles = create_aef_profiles( aef_ids, security_methods, security_methods_normalized, domain_name, interface_descriptions) body['aefProfiles'] = profiles Loading Loading @@ -240,3 +242,37 @@ def create_service_api_description_patch(aef_id=None, def get_value(lst, index): return lst[index] if index < len(lst) else None def create_interface_description(ipv4_addr=None, ipv6_addr=None, fqdn=None, port=None, api_prefix=None, security_methods=None, grant_types=None): """ Create an interface description with the given parameters. """ # Create the interface description dictionary data = dict() if ipv4_addr is not None: data['ipv4Addr'] = ipv4_addr elif ipv6_addr is not None: data['ipv6Addr'] = ipv6_addr elif fqdn is not None: data['fqdn'] = fqdn else: raise ValueError( "At least one of ipv4_addr, ipv6_addr, or fqdn must be provided.") if port is not None: data['port'] = port if api_prefix is not None: data['apiPrefix'] = api_prefix if security_methods is not None: data['securityMethods'] = security_methods if grant_types is not None: data['grantTypes'] = grant_types # Return the interface description return data
tests/libraries/helpers.py +12 −0 Original line number Diff line number Diff line Loading @@ -50,6 +50,12 @@ def store_in_file(file_path, data): f.write(bytes(data, 'utf-8')) f.close() def read_file_utf8(file_path: str) -> str: try: with open(file_path, 'r', encoding='utf-8') as file: return file.read() except Exception as error: raise Exception(f"Error al leer el archivo: {error}") def cert_tuple(cert_file, key_file): return (cert_file, key_file) Loading Loading @@ -134,6 +140,12 @@ def remove_key_from_object(input, key_to_remove): return input_copy def add_key_to_object(input, key_to_add, value_to_add): input_copy = copy.deepcopy(input) input_copy[key_to_add] = value_to_add return input_copy def create_scope(aef_id, api_name): data = "3gpp#" + aef_id + ":" + api_name Loading
tests/resources/common/basicRequests.robot +13 −0 Original line number Diff line number Diff line Loading @@ -796,6 +796,9 @@ Publish Service Api Request ... ${vendor_specific_aef_profile}=${None} ... ${aef_id}=${NONE} ... ${api_status}=${NONE} ... ${security_methods}=default ... ${domain_name}=${NONE} ... ${interface_descriptions}=${NONE} ${aef_ids}= Create List IF "${aef_id}" == "${NONE}" Loading Loading @@ -826,6 +829,10 @@ Publish Service Api Request ... ${vendor_specific_service_api_description} ... ${vendor_specific_aef_profile} ... ${api_status} ... ${security_methods} ... ${domain_name} ... ${interface_descriptions} ${resp}= Post Request Capif ... /published-apis/v1/${apf_id_to_use}/service-apis ... json=${request_body} Loading @@ -846,6 +853,9 @@ Publish Service Api ... ${vendor_specific_aef_profile}=${None} ... ${aef_id}=${NONE} ... ${api_status}=${NONE} ... ${security_methods}=default ... ${domain_name}=${NONE} ... ${interface_descriptions}=${NONE} ${resp} ${request_body}= Publish Service Api Request ... ${register_user_info_provider} Loading @@ -857,6 +867,9 @@ Publish Service Api ... ${vendor_specific_aef_profile} ... ${aef_id} ... ${api_status} ... ${security_methods} ... ${domain_name} ... ${interface_descriptions} Check Response Variable Type And Values ${resp} 201 ServiceAPIDescription Dictionary Should Contain Key ${resp.json()} apiId Loading