Loading services/TS29222_CAPIF_API_Provider_Management_API/api_provider_management/core/provider_enrolment_details_api.py +33 −9 Original line number Diff line number Diff line Loading @@ -77,9 +77,17 @@ class ProviderManagementOperations(Resource): for api_provider_func in api_provider_enrolment_details.api_prov_funcs: api_provider_func.api_prov_func_id = api_provider_func.api_prov_func_role + \ str(secrets.token_hex(15)) try: certificate = sign_certificate( api_provider_func.reg_info.api_prov_pub_key, api_provider_func.api_prov_func_id) api_provider_func.reg_info.api_prov_cert = certificate except Exception as e: current_app.logger.error(f"Certificate signing failed: {str(e)}") return bad_request_error( detail="Certificate signing failed", cause=str(e), invalid_params=[{"param": "apiProvPubKey", "reason": "Invalid public key format or certificate signing error"}] ) self.auth_manager.add_auth_provider(certificate, api_provider_func.api_prov_func_id, api_provider_func.api_prov_func_role, api_provider_enrolment_details.api_prov_dom_id) Loading Loading @@ -172,9 +180,17 @@ class ProviderManagementOperations(Resource): if func.api_prov_func_id is None: func.api_prov_func_id = func.api_prov_func_role + \ str(secrets.token_hex(15)) try: certificate = sign_certificate( func.reg_info.api_prov_pub_key, func.api_prov_func_id) func.reg_info.api_prov_cert = certificate except Exception as e: current_app.logger.error(f"Certificate signing failed: {str(e)}") return bad_request_error( detail="Certificate signing failed", cause=str(e), invalid_params=[{"param": "apiProvPubKey", "reason": "Invalid public key format"}] ) self.auth_manager.update_auth_provider( certificate, func.api_prov_func_id, api_prov_dom_id, func.api_prov_func_role) Loading @@ -185,9 +201,17 @@ class ProviderManagementOperations(Resource): if func.api_prov_func_role != api_func["api_prov_func_role"]: return bad_request_error(detail="Bad Role in provider", cause="Different role in update reqeuest", invalid_params=[{"param": "api_prov_func_role", "reason": "different role with same id"}]) if func.reg_info.api_prov_pub_key != api_func["reg_info"]["api_prov_pub_key"]: try: certificate = sign_certificate( func.reg_info.api_prov_pub_key, api_func["api_prov_func_id"]) func.reg_info.api_prov_cert = certificate except Exception as e: current_app.logger.error(f"Certificate signing failed: {str(e)}") return bad_request_error( detail="Certificate signing failed", cause=str(e), invalid_params=[{"param": "apiProvPubKey", "reason": "Invalid public key format or certificate signing error"}] ) self.auth_manager.update_auth_provider( certificate, func.api_prov_func_id, api_prov_dom_id, func.api_prov_func_role) Loading services/TS29222_CAPIF_API_Provider_Management_API/api_provider_management/core/sign_certificate.py +7 −0 Original line number Diff line number Diff line Loading @@ -27,4 +27,11 @@ def sign_certificate(publick_key, provider_id): response = requests.request("POST", url, headers=headers, data=json.dumps(data), verify = config["ca_factory"].get("verify", False)) response_payload = json.loads(response.text) if "errors" in response_payload: error_msg = "; ".join(response_payload["errors"]) raise Exception(f"Certificate signing failed: {error_msg}") if "data" not in response_payload or "certificate" not in response_payload["data"]: raise Exception("Vault response missing certificate data") return response_payload["data"]["certificate"] No newline at end of file services/TS29222_CAPIF_Discover_Service_API/service_apis/core/discoveredapis.py +5 −4 Original line number Diff line number Diff line Loading @@ -80,7 +80,7 @@ class DiscoverApisOperations(Resource): if "vend-spec" in param: vend_param = param.split("vend-spec-")[1] attribute_path = query_params[param]["target"].split('/') vend_spec_query_params_n_values[attribute_path[1] + "." + vend_param] = query_params[param]["value"] vend_spec_query_params_n_values[".".join(attribute_path[1:]) + "." + vend_param] = query_params[param]["value"] elif param in ["api_version", "comm_type", "protocol", "aef_id", "data_format"]: my_params.append(json.loads(query_params_name[param].replace("QPV", query_params[param]))) else: Loading @@ -102,9 +102,10 @@ class DiscoverApisOperations(Resource): vendor_specific_fields_path = find_attribute_in_body(discoved_api, '') if vendor_specific_fields_path: if vend_spec_query_params_n_values: vs_filtered_apis = filter_apis_with_vendor_specific_params(discoved_api, vend_spec_query_params_n_values, vendor_specific_fields_path) if vs_filtered_apis: json_docs.append(filter_fields(vs_filtered_apis)) pass_filter = filter_apis_with_vendor_specific_params(discoved_api, vend_spec_query_params_n_values) if pass_filter: json_docs.append(filter_fields(discoved_api)) else: json_docs.append(filter_fields(discoved_api)) else: Loading services/TS29222_CAPIF_Discover_Service_API/service_apis/vendor_specific.py +22 −18 Original line number Diff line number Diff line import re from flask import current_app def find_attribute_in_body(test, path): f_key = [] Loading Loading @@ -70,22 +70,26 @@ def remove_vendor_specific_fields(discoved_api, vendor_specific_fields_path): return discoved_api def filter_apis_with_vendor_specific_params(discoved_api, vend_spec_query_params_n_values, vendor_specific_fields_path): def nested_key_exists(dictionary, keys): """ Checks if a nested path of keys exists in a dictionary. """ _dict = dictionary for key in keys: if isinstance(_dict, dict) and key in _dict: _dict = _dict[key] else: return False, -1 return True, _dict def filter_apis_with_vendor_specific_params(discoved_api, vend_spec_query_params_n_values): pass_filter = True for k, v in vend_spec_query_params_n_values.items(): parts = k.split('.') for path in vendor_specific_fields_path: if parts[0] in path: path_pieces = path.split('.') tmp_body = discoved_api vs_field = path_pieces[-1] for path_piece in path_pieces[:-1]: if path_piece.isnumeric(): path_piece = int(path_piece) v_2 = tmp_body[path_piece] tmp_body = v_2 if parts[1] in tmp_body[vs_field].keys(): if tmp_body[vs_field][parts[1]] != v: return {} else: return {} return discoved_api No newline at end of file exists, value = nested_key_exists(discoved_api, parts) if exists: if v != value: pass_filter = False break return pass_filter No newline at end of file services/TS29222_CAPIF_Publish_Service_API/published_apis/core/serviceapidescriptions.py +82 −5 Original line number Diff line number Diff line Loading @@ -24,6 +24,31 @@ publisher_ops = Publisher() service_api_not_found_message = "Service API not found" def find_duplicate_service_by_api_name_and_aef( collection, api_name, aef_profiles, excluded_api_id=None): duplicate_query = {"api_name": api_name} aef_ids = set() for profile in aef_profiles or []: if isinstance(profile, dict): aef_id = profile.get("aef_id") else: aef_id = getattr(profile, "aef_id", None) if aef_id: aef_ids.add(aef_id) aef_ids = sorted(aef_ids) if aef_ids: duplicate_query["aef_profiles.aef_id"] = {"$in": aef_ids} if excluded_api_id: duplicate_query["api_id"] = {"$ne": excluded_api_id} return collection.find_one(duplicate_query), aef_ids def return_negotiated_supp_feat_dict(supp_feat): final_supp_feat = bin(int(supp_feat, 16) & int(SUPPORTED_FEATURES_HEX, 16))[2:].zfill(TOTAL_FEATURES)[::-1] Loading Loading @@ -122,19 +147,31 @@ class PublishServiceOperations(Resource): try: current_app.logger.debug("Publishing service") service = mycol.find_one( {"api_name": serviceapidescription.api_name}) serviceapidescription_dict = serviceapidescription.to_dict() service, aef_ids = find_duplicate_service_by_api_name_and_aef( mycol, serviceapidescription.api_name, serviceapidescription_dict.get("aef_profiles")) if service is not None: if aef_ids: current_app.logger.error( "Service already registered with same api_name/aef_id pair") return forbidden_error( detail="Already registered service with same api name and aef id", cause="Found service with same api name and aef id") current_app.logger.error( "Service already registered with same api name") return forbidden_error(detail="Already registered service with same api name", cause="Found service with same api name") return forbidden_error( detail="Already registered service with same api name", cause="Found service with same api name") api_id = secrets.token_hex(15) serviceapidescription.api_id = api_id serviceapidescription_dict["api_id"] = api_id rec = dict() rec['apf_id'] = apf_id rec['onboarding_date'] = datetime.now() serviceapidescription_dict = serviceapidescription.to_dict() if vendor_specific: serviceapidescription_dict = add_vend_spec_fields( Loading @@ -155,7 +192,7 @@ class PublishServiceOperations(Resource): res = make_response(object=clean_n_camel_case( serviceapidescription_dict), status=201) res.headers['Location'] = f"https://{os.getenv("CAPIF_HOSTNAME")}/published-apis/v1/{str(apf_id)}/service-apis/{str(api_id)}" res.headers['Location'] = f"https://{os.getenv('CAPIF_HOSTNAME')}/published-apis/v1/{str(apf_id)}/service-apis/{str(api_id)}" if res.status_code == 201: current_app.logger.info("Service published") Loading Loading @@ -311,6 +348,26 @@ class PublishServiceOperations(Resource): service_api_description["apf_id"] = serviceapidescription_old["apf_id"] service_api_description["onboarding_date"] = serviceapidescription_old["onboarding_date"] service_api_description["api_id"] = serviceapidescription_old["api_id"] service_with_same_identity, aef_ids = find_duplicate_service_by_api_name_and_aef( mycol, service_api_description.get("api_name", serviceapidescription_old.get("api_name")), service_api_description.get("aef_profiles", serviceapidescription_old.get("aef_profiles")), excluded_api_id=service_api_description["api_id"]) if service_with_same_identity is not None: if aef_ids: current_app.logger.error( "Service already registered with same api_name/aef_id pair") return forbidden_error( detail="Already registered service with same api name and aef id", cause="Found service with same api name and aef id") current_app.logger.error( "Service already registered with same api name") return forbidden_error( detail="Already registered service with same api name", cause="Found service with same api name") service_api_description["supported_features"] = return_negotiated_supp_feat_dict(service_api_description["supported_features"])["Final"] if not return_negotiated_supp_feat_dict(service_api_description.get("supported_features"))["ApiStatusMonitoring"] and service_api_description.get("api_status", None) is not None: Loading Loading @@ -400,6 +457,26 @@ class PublishServiceOperations(Resource): api_status = patch_service_api_description.get("api_status", None) supported_features = patch_service_api_description.get("supported_features", None) patch_service_api_description = clean_empty(patch_service_api_description) service_with_same_identity, aef_ids = find_duplicate_service_by_api_name_and_aef( mycol, serviceapidescription_old.get("api_name"), patch_service_api_description.get("aef_profiles", serviceapidescription_old.get("aef_profiles")), excluded_api_id=serviceapidescription_old.get("api_id")) if service_with_same_identity is not None: if aef_ids: current_app.logger.error( "Service already registered with same api_name/aef_id pair") return forbidden_error( detail="Already registered service with same api name and aef id", cause="Found service with same api name and aef id") current_app.logger.error( "Service already registered with same api name") return forbidden_error( detail="Already registered service with same api name", cause="Found service with same api name") if api_status: patch_service_api_description["api_status"]=api_status if supported_features: Loading Loading
services/TS29222_CAPIF_API_Provider_Management_API/api_provider_management/core/provider_enrolment_details_api.py +33 −9 Original line number Diff line number Diff line Loading @@ -77,9 +77,17 @@ class ProviderManagementOperations(Resource): for api_provider_func in api_provider_enrolment_details.api_prov_funcs: api_provider_func.api_prov_func_id = api_provider_func.api_prov_func_role + \ str(secrets.token_hex(15)) try: certificate = sign_certificate( api_provider_func.reg_info.api_prov_pub_key, api_provider_func.api_prov_func_id) api_provider_func.reg_info.api_prov_cert = certificate except Exception as e: current_app.logger.error(f"Certificate signing failed: {str(e)}") return bad_request_error( detail="Certificate signing failed", cause=str(e), invalid_params=[{"param": "apiProvPubKey", "reason": "Invalid public key format or certificate signing error"}] ) self.auth_manager.add_auth_provider(certificate, api_provider_func.api_prov_func_id, api_provider_func.api_prov_func_role, api_provider_enrolment_details.api_prov_dom_id) Loading Loading @@ -172,9 +180,17 @@ class ProviderManagementOperations(Resource): if func.api_prov_func_id is None: func.api_prov_func_id = func.api_prov_func_role + \ str(secrets.token_hex(15)) try: certificate = sign_certificate( func.reg_info.api_prov_pub_key, func.api_prov_func_id) func.reg_info.api_prov_cert = certificate except Exception as e: current_app.logger.error(f"Certificate signing failed: {str(e)}") return bad_request_error( detail="Certificate signing failed", cause=str(e), invalid_params=[{"param": "apiProvPubKey", "reason": "Invalid public key format"}] ) self.auth_manager.update_auth_provider( certificate, func.api_prov_func_id, api_prov_dom_id, func.api_prov_func_role) Loading @@ -185,9 +201,17 @@ class ProviderManagementOperations(Resource): if func.api_prov_func_role != api_func["api_prov_func_role"]: return bad_request_error(detail="Bad Role in provider", cause="Different role in update reqeuest", invalid_params=[{"param": "api_prov_func_role", "reason": "different role with same id"}]) if func.reg_info.api_prov_pub_key != api_func["reg_info"]["api_prov_pub_key"]: try: certificate = sign_certificate( func.reg_info.api_prov_pub_key, api_func["api_prov_func_id"]) func.reg_info.api_prov_cert = certificate except Exception as e: current_app.logger.error(f"Certificate signing failed: {str(e)}") return bad_request_error( detail="Certificate signing failed", cause=str(e), invalid_params=[{"param": "apiProvPubKey", "reason": "Invalid public key format or certificate signing error"}] ) self.auth_manager.update_auth_provider( certificate, func.api_prov_func_id, api_prov_dom_id, func.api_prov_func_role) Loading
services/TS29222_CAPIF_API_Provider_Management_API/api_provider_management/core/sign_certificate.py +7 −0 Original line number Diff line number Diff line Loading @@ -27,4 +27,11 @@ def sign_certificate(publick_key, provider_id): response = requests.request("POST", url, headers=headers, data=json.dumps(data), verify = config["ca_factory"].get("verify", False)) response_payload = json.loads(response.text) if "errors" in response_payload: error_msg = "; ".join(response_payload["errors"]) raise Exception(f"Certificate signing failed: {error_msg}") if "data" not in response_payload or "certificate" not in response_payload["data"]: raise Exception("Vault response missing certificate data") return response_payload["data"]["certificate"] No newline at end of file
services/TS29222_CAPIF_Discover_Service_API/service_apis/core/discoveredapis.py +5 −4 Original line number Diff line number Diff line Loading @@ -80,7 +80,7 @@ class DiscoverApisOperations(Resource): if "vend-spec" in param: vend_param = param.split("vend-spec-")[1] attribute_path = query_params[param]["target"].split('/') vend_spec_query_params_n_values[attribute_path[1] + "." + vend_param] = query_params[param]["value"] vend_spec_query_params_n_values[".".join(attribute_path[1:]) + "." + vend_param] = query_params[param]["value"] elif param in ["api_version", "comm_type", "protocol", "aef_id", "data_format"]: my_params.append(json.loads(query_params_name[param].replace("QPV", query_params[param]))) else: Loading @@ -102,9 +102,10 @@ class DiscoverApisOperations(Resource): vendor_specific_fields_path = find_attribute_in_body(discoved_api, '') if vendor_specific_fields_path: if vend_spec_query_params_n_values: vs_filtered_apis = filter_apis_with_vendor_specific_params(discoved_api, vend_spec_query_params_n_values, vendor_specific_fields_path) if vs_filtered_apis: json_docs.append(filter_fields(vs_filtered_apis)) pass_filter = filter_apis_with_vendor_specific_params(discoved_api, vend_spec_query_params_n_values) if pass_filter: json_docs.append(filter_fields(discoved_api)) else: json_docs.append(filter_fields(discoved_api)) else: Loading
services/TS29222_CAPIF_Discover_Service_API/service_apis/vendor_specific.py +22 −18 Original line number Diff line number Diff line import re from flask import current_app def find_attribute_in_body(test, path): f_key = [] Loading Loading @@ -70,22 +70,26 @@ def remove_vendor_specific_fields(discoved_api, vendor_specific_fields_path): return discoved_api def filter_apis_with_vendor_specific_params(discoved_api, vend_spec_query_params_n_values, vendor_specific_fields_path): def nested_key_exists(dictionary, keys): """ Checks if a nested path of keys exists in a dictionary. """ _dict = dictionary for key in keys: if isinstance(_dict, dict) and key in _dict: _dict = _dict[key] else: return False, -1 return True, _dict def filter_apis_with_vendor_specific_params(discoved_api, vend_spec_query_params_n_values): pass_filter = True for k, v in vend_spec_query_params_n_values.items(): parts = k.split('.') for path in vendor_specific_fields_path: if parts[0] in path: path_pieces = path.split('.') tmp_body = discoved_api vs_field = path_pieces[-1] for path_piece in path_pieces[:-1]: if path_piece.isnumeric(): path_piece = int(path_piece) v_2 = tmp_body[path_piece] tmp_body = v_2 if parts[1] in tmp_body[vs_field].keys(): if tmp_body[vs_field][parts[1]] != v: return {} else: return {} return discoved_api No newline at end of file exists, value = nested_key_exists(discoved_api, parts) if exists: if v != value: pass_filter = False break return pass_filter No newline at end of file
services/TS29222_CAPIF_Publish_Service_API/published_apis/core/serviceapidescriptions.py +82 −5 Original line number Diff line number Diff line Loading @@ -24,6 +24,31 @@ publisher_ops = Publisher() service_api_not_found_message = "Service API not found" def find_duplicate_service_by_api_name_and_aef( collection, api_name, aef_profiles, excluded_api_id=None): duplicate_query = {"api_name": api_name} aef_ids = set() for profile in aef_profiles or []: if isinstance(profile, dict): aef_id = profile.get("aef_id") else: aef_id = getattr(profile, "aef_id", None) if aef_id: aef_ids.add(aef_id) aef_ids = sorted(aef_ids) if aef_ids: duplicate_query["aef_profiles.aef_id"] = {"$in": aef_ids} if excluded_api_id: duplicate_query["api_id"] = {"$ne": excluded_api_id} return collection.find_one(duplicate_query), aef_ids def return_negotiated_supp_feat_dict(supp_feat): final_supp_feat = bin(int(supp_feat, 16) & int(SUPPORTED_FEATURES_HEX, 16))[2:].zfill(TOTAL_FEATURES)[::-1] Loading Loading @@ -122,19 +147,31 @@ class PublishServiceOperations(Resource): try: current_app.logger.debug("Publishing service") service = mycol.find_one( {"api_name": serviceapidescription.api_name}) serviceapidescription_dict = serviceapidescription.to_dict() service, aef_ids = find_duplicate_service_by_api_name_and_aef( mycol, serviceapidescription.api_name, serviceapidescription_dict.get("aef_profiles")) if service is not None: if aef_ids: current_app.logger.error( "Service already registered with same api_name/aef_id pair") return forbidden_error( detail="Already registered service with same api name and aef id", cause="Found service with same api name and aef id") current_app.logger.error( "Service already registered with same api name") return forbidden_error(detail="Already registered service with same api name", cause="Found service with same api name") return forbidden_error( detail="Already registered service with same api name", cause="Found service with same api name") api_id = secrets.token_hex(15) serviceapidescription.api_id = api_id serviceapidescription_dict["api_id"] = api_id rec = dict() rec['apf_id'] = apf_id rec['onboarding_date'] = datetime.now() serviceapidescription_dict = serviceapidescription.to_dict() if vendor_specific: serviceapidescription_dict = add_vend_spec_fields( Loading @@ -155,7 +192,7 @@ class PublishServiceOperations(Resource): res = make_response(object=clean_n_camel_case( serviceapidescription_dict), status=201) res.headers['Location'] = f"https://{os.getenv("CAPIF_HOSTNAME")}/published-apis/v1/{str(apf_id)}/service-apis/{str(api_id)}" res.headers['Location'] = f"https://{os.getenv('CAPIF_HOSTNAME')}/published-apis/v1/{str(apf_id)}/service-apis/{str(api_id)}" if res.status_code == 201: current_app.logger.info("Service published") Loading Loading @@ -311,6 +348,26 @@ class PublishServiceOperations(Resource): service_api_description["apf_id"] = serviceapidescription_old["apf_id"] service_api_description["onboarding_date"] = serviceapidescription_old["onboarding_date"] service_api_description["api_id"] = serviceapidescription_old["api_id"] service_with_same_identity, aef_ids = find_duplicate_service_by_api_name_and_aef( mycol, service_api_description.get("api_name", serviceapidescription_old.get("api_name")), service_api_description.get("aef_profiles", serviceapidescription_old.get("aef_profiles")), excluded_api_id=service_api_description["api_id"]) if service_with_same_identity is not None: if aef_ids: current_app.logger.error( "Service already registered with same api_name/aef_id pair") return forbidden_error( detail="Already registered service with same api name and aef id", cause="Found service with same api name and aef id") current_app.logger.error( "Service already registered with same api name") return forbidden_error( detail="Already registered service with same api name", cause="Found service with same api name") service_api_description["supported_features"] = return_negotiated_supp_feat_dict(service_api_description["supported_features"])["Final"] if not return_negotiated_supp_feat_dict(service_api_description.get("supported_features"))["ApiStatusMonitoring"] and service_api_description.get("api_status", None) is not None: Loading Loading @@ -400,6 +457,26 @@ class PublishServiceOperations(Resource): api_status = patch_service_api_description.get("api_status", None) supported_features = patch_service_api_description.get("supported_features", None) patch_service_api_description = clean_empty(patch_service_api_description) service_with_same_identity, aef_ids = find_duplicate_service_by_api_name_and_aef( mycol, serviceapidescription_old.get("api_name"), patch_service_api_description.get("aef_profiles", serviceapidescription_old.get("aef_profiles")), excluded_api_id=serviceapidescription_old.get("api_id")) if service_with_same_identity is not None: if aef_ids: current_app.logger.error( "Service already registered with same api_name/aef_id pair") return forbidden_error( detail="Already registered service with same api name and aef id", cause="Found service with same api name and aef id") current_app.logger.error( "Service already registered with same api name") return forbidden_error( detail="Already registered service with same api name", cause="Found service with same api name") if api_status: patch_service_api_description["api_status"]=api_status if supported_features: Loading