From f47bb84388972b5e48bb1fc14f8f223576d98497 Mon Sep 17 00:00:00 2001 From: Afonso Castanheta Date: Wed, 4 Mar 2026 10:04:50 +0000 Subject: [PATCH 1/6] Refactor user validation logic to fix fail-open pattern vulnerability --- .../core/validate_user.py | 14 +++-- .../core/validate_user.py | 14 +++-- .../logs/core/validate_user.py | 14 +++-- .../service_apis/core/validate_user.py | 14 +++-- .../capif_events/core/validate_user.py | 22 ++++---- .../api_invocation_logs/core/validate_user.py | 14 +++-- .../published_apis/core/validate_user.py | 52 +++++++++++-------- .../capif_security/core/validate_user.py | 14 +++-- .../visibility_control/core/validate_user.py | 26 ++++++---- 9 files changed, 115 insertions(+), 69 deletions(-) diff --git a/services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/validate_user.py b/services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/validate_user.py index 917c4ccb..03b013a5 100644 --- a/services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/validate_user.py +++ b/services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/validate_user.py @@ -19,11 +19,15 @@ class ControlAccess(Resource): my_query = {'invoker_id':api_invoker_id} cert_entry = cert_col.find_one(my_query) - if cert_entry is not None: - if cert_entry["cert_signature"] != cert_signature: - prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") - prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") + if cert_entry is None: + prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found for invoker") + prob = serialize_clean_camel_case(prob) + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") + + if cert_entry["cert_signature"] != cert_signature: + prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") + prob = serialize_clean_camel_case(prob) + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate invoker" diff --git a/services/TS29222_CAPIF_API_Provider_Management_API/api_provider_management/core/validate_user.py b/services/TS29222_CAPIF_API_Provider_Management_API/api_provider_management/core/validate_user.py index fea52c11..dac83eb2 100644 --- a/services/TS29222_CAPIF_API_Provider_Management_API/api_provider_management/core/validate_user.py +++ b/services/TS29222_CAPIF_API_Provider_Management_API/api_provider_management/core/validate_user.py @@ -19,11 +19,15 @@ class ControlAccess(Resource): my_query = {'provider_id':api_provider_id, "role": "AMF"} cert_entry = cert_col.find_one(my_query) - if cert_entry is not None: - if cert_entry["cert_signature"] != cert_signature: - prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") - prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") + if cert_entry is None: + prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found for provider") + prob = serialize_clean_camel_case(prob) + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") + + if cert_entry["cert_signature"] != cert_signature: + prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") + prob = serialize_clean_camel_case(prob) + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate amf" diff --git a/services/TS29222_CAPIF_Auditing_API/logs/core/validate_user.py b/services/TS29222_CAPIF_Auditing_API/logs/core/validate_user.py index 545642e1..68aed77a 100644 --- a/services/TS29222_CAPIF_Auditing_API/logs/core/validate_user.py +++ b/services/TS29222_CAPIF_Auditing_API/logs/core/validate_user.py @@ -19,11 +19,15 @@ class ControlAccess(Resource): my_query = {'cert_signature': cert_signature} cert_entry = cert_col.find_one(my_query) - if cert_entry is not None: - if cert_entry["role"] != "AMF": - prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") - prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") + if cert_entry is None: + prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found") + prob = serialize_clean_camel_case(prob) + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") + + if cert_entry["role"] != "AMF": + prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") + prob = serialize_clean_camel_case(prob) + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate invoker" diff --git a/services/TS29222_CAPIF_Discover_Service_API/service_apis/core/validate_user.py b/services/TS29222_CAPIF_Discover_Service_API/service_apis/core/validate_user.py index 6d2ea40f..6dbd438e 100644 --- a/services/TS29222_CAPIF_Discover_Service_API/service_apis/core/validate_user.py +++ b/services/TS29222_CAPIF_Discover_Service_API/service_apis/core/validate_user.py @@ -20,11 +20,15 @@ class ControlAccess(Resource): my_query = {'id': api_invoker_id} cert_entry = cert_col.find_one(my_query) - if cert_entry is not None: - if cert_entry["cert_signature"] != cert_signature: - prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") - prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") + if cert_entry is None: + prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found for invoker") + prob = serialize_clean_camel_case(prob) + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") + + if cert_entry["cert_signature"] != cert_signature: + prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") + prob = serialize_clean_camel_case(prob) + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate invoker" diff --git a/services/TS29222_CAPIF_Events_API/capif_events/core/validate_user.py b/services/TS29222_CAPIF_Events_API/capif_events/core/validate_user.py index 8d1f8b05..60857a14 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/core/validate_user.py +++ b/services/TS29222_CAPIF_Events_API/capif_events/core/validate_user.py @@ -19,17 +19,21 @@ class ControlAccess(Resource): my_query = {'id':subscriber_id} cert_entry = cert_col.find_one(my_query) - if cert_entry is not None: - if (event_id is None and cert_entry["cert_signature"] != cert_signature): - prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") - prob = serialize_clean_camel_case(prob) + if cert_entry is None: + prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found for subscriber") + prob = serialize_clean_camel_case(prob) + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") - elif event_id is not None and (cert_entry["cert_signature"] != cert_signature or "event_subscriptions" not in cert_entry["resources"] or event_id not in cert_entry["resources"]["event_subscriptions"]): - prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") - prob = serialize_clean_camel_case(prob) + if (event_id is None and cert_entry["cert_signature"] != cert_signature): + prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") + prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") + elif event_id is not None and (cert_entry["cert_signature"] != cert_signature or "event_subscriptions" not in cert_entry["resources"] or event_id not in cert_entry["resources"]["event_subscriptions"]): + prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") + prob = serialize_clean_camel_case(prob) + + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate subscriber" current_app.logger.error(exception + "::" + str(e)) diff --git a/services/TS29222_CAPIF_Logging_API_Invocation_API/api_invocation_logs/core/validate_user.py b/services/TS29222_CAPIF_Logging_API_Invocation_API/api_invocation_logs/core/validate_user.py index f3f5cb6f..9cce1fa3 100644 --- a/services/TS29222_CAPIF_Logging_API_Invocation_API/api_invocation_logs/core/validate_user.py +++ b/services/TS29222_CAPIF_Logging_API_Invocation_API/api_invocation_logs/core/validate_user.py @@ -19,11 +19,15 @@ class ControlAccess(Resource): my_query = {'id':aef_id} cert_entry = cert_col.find_one(my_query) - if cert_entry is not None: - if cert_entry["cert_signature"] != cert_signature: - prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") - prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") + if cert_entry is None: + prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found for AEF") + prob = serialize_clean_camel_case(prob) + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") + + if cert_entry["cert_signature"] != cert_signature: + prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") + prob = serialize_clean_camel_case(prob) + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate invoker" diff --git a/services/TS29222_CAPIF_Publish_Service_API/published_apis/core/validate_user.py b/services/TS29222_CAPIF_Publish_Service_API/published_apis/core/validate_user.py index 1782fe04..7d2624f0 100644 --- a/services/TS29222_CAPIF_Publish_Service_API/published_apis/core/validate_user.py +++ b/services/TS29222_CAPIF_Publish_Service_API/published_apis/core/validate_user.py @@ -19,29 +19,39 @@ class ControlAccess(Resource): my_query = {'id': apf_id} cert_entry = cert_col.find_one(my_query) - if cert_entry is not None: - is_user_owner = True - if cert_entry["cert_signature"] != cert_signature: + if cert_entry is None: + prob = ProblemDetails( + title="Unauthorized", + detail="User not authorized", + cause="Certificate not found for APF") + prob = serialize_clean_camel_case(prob) + return Response( + json.dumps(prob, cls=CustomJSONEncoder), + status=401, + mimetype="application/json") + + is_user_owner = True + if cert_entry["cert_signature"] != cert_signature: + is_user_owner = False + elif service_id: + if "services" not in cert_entry["resources"]: is_user_owner = False - elif service_id: - if "services" not in cert_entry["resources"]: + elif cert_entry.get("resources") and cert_entry["resources"].get("services"): + if service_id not in cert_entry["resources"].get("services"): is_user_owner = False - elif cert_entry.get("resources") and cert_entry["resources"].get("services"): - if service_id not in cert_entry["resources"].get("services"): - is_user_owner = False - if is_user_owner == False: - current_app.logger.info("STEP3") - prob = ProblemDetails( - title="Unauthorized", - detail="User not authorized", - cause="You are not the owner of this resource") - current_app.logger.info("STEP4") - prob = serialize_clean_camel_case(prob) - current_app.logger.info("STEP5") - return Response( - json.dumps(prob, cls=CustomJSONEncoder), - status=401, - mimetype="application/json") + if is_user_owner == False: + current_app.logger.info("STEP3") + prob = ProblemDetails( + title="Forbidden", + detail="User not authorized", + cause="You are not the owner of this resource") + current_app.logger.info("STEP4") + prob = serialize_clean_camel_case(prob) + current_app.logger.info("STEP5") + return Response( + json.dumps(prob, cls=CustomJSONEncoder), + status=403, + mimetype="application/json") except Exception as e: exception = "An exception occurred in validate apf" diff --git a/services/TS29222_CAPIF_Security_API/capif_security/core/validate_user.py b/services/TS29222_CAPIF_Security_API/capif_security/core/validate_user.py index 29180d1a..f4074f17 100644 --- a/services/TS29222_CAPIF_Security_API/capif_security/core/validate_user.py +++ b/services/TS29222_CAPIF_Security_API/capif_security/core/validate_user.py @@ -18,11 +18,15 @@ class ControlAccess(Resource): my_query = {'id':invoker_id} cert_entry = cert_col.find_one(my_query) - if cert_entry is not None: - if cert_entry["cert_signature"] != cert_signature: - prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") - prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") + if cert_entry is None: + prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found for invoker") + prob = serialize_clean_camel_case(prob) + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") + + if cert_entry["cert_signature"] != cert_signature: + prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") + prob = serialize_clean_camel_case(prob) + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate invoker" diff --git a/services/helper/helper_service/services/visibility_control/core/validate_user.py b/services/helper/helper_service/services/visibility_control/core/validate_user.py index 750bb9d5..a4dbb59d 100644 --- a/services/helper/helper_service/services/visibility_control/core/validate_user.py +++ b/services/helper/helper_service/services/visibility_control/core/validate_user.py @@ -16,15 +16,23 @@ class ControlAccess(Resource): my_query = {'provider_id': api_provider_id} cert_entry = cert_col.find_one(my_query) - if cert_entry is not None: - if cert_entry["cert_signature"] != cert_signature: - # Return 401 if signatures don't match - prob = { - "title": "Unauthorized", - "detail": "User not authorized", - "cause": "You are not the owner of this resource" - } - return Response(json.dumps(prob), status=401, mimetype="application/json") + if cert_entry is None: + # Explicit denial when certificate not found + prob = { + "title": "Unauthorized", + "detail": "User not authorized", + "cause": "Certificate not found for provider" + } + return Response(json.dumps(prob), status=401, mimetype="application/json") + + if cert_entry["cert_signature"] != cert_signature: + # Return 403 if signatures don't match + prob = { + "title": "Forbidden", + "detail": "User not authorized", + "cause": "You are not the owner of this resource" + } + return Response(json.dumps(prob), status=403, mimetype="application/json") return None except Exception as e: current_app.logger.error("Error in validate_user_cert: " + str(e)) -- GitLab From 91e9a0172ef4c6b9e2155091242d358c0a3e28fb Mon Sep 17 00:00:00 2001 From: Afonso Castanheta Date: Thu, 26 Mar 2026 21:21:40 +0000 Subject: [PATCH 2/6] Fix mismatch of query parameter for auth_manager methods --- .../api_invoker_management/core/auth_manager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/auth_manager.py b/services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/auth_manager.py index 4614d64f..24290bfa 100644 --- a/services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/auth_manager.py +++ b/services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/auth_manager.py @@ -10,15 +10,15 @@ class AuthManager(Resource): cert_col = self.db.get_col_by_name(self.db.certs_col) cert_encode = x509.load_pem_x509_certificate(str.encode(cert), default_backend()) - cert_col.insert_one({"cert_signature":cert_encode.signature.hex(), "role":"invoker", "id": invoker_id, "resources": {}}) + cert_col.insert_one({"cert_signature":cert_encode.signature.hex(), "role":"invoker", "invoker_id": invoker_id, "resources": {}}) def update_auth_invoker(self, cert, invoker_id): cert_col = self.db.get_col_by_name(self.db.certs_col) cert_encode = x509.load_pem_x509_certificate(str.encode(cert), default_backend()) - cert_col.find_one_and_update({"id":invoker_id}, {"$set":{"cert_signature":cert_encode.signature.hex()}}) + cert_col.find_one_and_update({"invoker_id":invoker_id}, {"$set":{"cert_signature":cert_encode.signature.hex()}}) def remove_auth_invoker(self, invoker_id): cert_col = self.db.get_col_by_name(self.db.certs_col) - cert_col.delete_one({"id":invoker_id}) + cert_col.delete_one({"invoker_id":invoker_id}) -- GitLab From 54e3b8310e86d2499f76e41b005658c77cb3934e Mon Sep 17 00:00:00 2001 From: Afonso Castanheta Date: Thu, 26 Mar 2026 21:28:43 +0000 Subject: [PATCH 3/6] Refactor user certificate validation to use standardized error responses --- .../api_invoker_management/core/validate_user.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/validate_user.py b/services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/validate_user.py index 03b013a5..a36cdd0c 100644 --- a/services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/validate_user.py +++ b/services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/validate_user.py @@ -6,7 +6,7 @@ from ..encoder import CustomJSONEncoder from ..models.problem_details import ProblemDetails from ..util import serialize_clean_camel_case from .resources import Resource -from .responses import internal_server_error +from .responses import forbidden_error, internal_server_error, unauthorized_error class ControlAccess(Resource): @@ -20,14 +20,10 @@ class ControlAccess(Resource): cert_entry = cert_col.find_one(my_query) if cert_entry is None: - prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found for invoker") - prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") - + return unauthorized_error(detail="Please provide an existing Network App ID", cause="Certificate not found for invoker") + if cert_entry["cert_signature"] != cert_signature: - prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") - prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") + return forbidden_error(detail="User not authorized", cause="You are not the owner of this resource") except Exception as e: exception = "An exception occurred in validate invoker" -- GitLab From 832314df8546e4407f41083f8da7a67543c14309 Mon Sep 17 00:00:00 2001 From: Afonso Castanheta Date: Thu, 26 Mar 2026 21:31:05 +0000 Subject: [PATCH 4/6] Fix failling tests --- .../capif_api_invoker_managenet.robot | 40 +++++++++++++++---- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/tests/features/CAPIF Api Invoker Management/capif_api_invoker_managenet.robot b/tests/features/CAPIF Api Invoker Management/capif_api_invoker_managenet.robot index 3052e194..ed52f8f6 100644 --- a/tests/features/CAPIF Api Invoker Management/capif_api_invoker_managenet.robot +++ b/tests/features/CAPIF Api Invoker Management/capif_api_invoker_managenet.robot @@ -97,11 +97,11 @@ Update Not Onboarded Network App ... username=${INVOKER_USERNAME} # Check Results - Check Response Variable Type And Values ${resp} 404 ProblemDetails - ... status=404 - ... title=Not Found + Check Response Variable Type And Values ${resp} 401 ProblemDetails + ... status=401 + ... title=Unauthorized ... detail=Please provide an existing Network App ID - ... cause=Not exist Network App ID + ... cause=Certificate not found for invoker Offboard Network App [Tags] capif_api_invoker_management-5 @@ -131,11 +131,11 @@ Offboard Not Previously Onboarded Network App ... username=${INVOKER_USERNAME} # Check Results - Check Response Variable Type And Values ${resp} 404 ProblemDetails - ... status=404 - ... title=Not Found + Check Response Variable Type And Values ${resp} 401 ProblemDetails + ... status=401 + ... title=Unauthorized ... detail=Please provide an existing Network App ID - ... cause=Not exist Network App ID + ... cause=Certificate not found for invoker Update Onboarded Network App Certificate [Tags] capif_api_invoker_management-7 @@ -183,6 +183,30 @@ Update Onboarded Network App Certificate Check Response Variable Type And Values ${resp} 200 APIInvokerEnrolmentDetails ... notificationDestination=${new_notification_destination} + #Revert to old invoker username and certificate + ${csr_request_old}= Create User Csr ${INVOKER_USERNAME} invoker + + ${old_onboarding_notification_body}= Create Onboarding Notification Body + ... http://${CAPIF_CALLBACK_IP}:${CAPIF_CALLBACK_PORT}/netapp_callback + ... ${csr_request_old} + ... ${INVOKER_USERNAME_NEW} + + Set To Dictionary + ... ${request_body} + ... onboardingInformation=${old_onboarding_notification_body['onboardingInformation']} + + ${resp}= Put Request Capif + ... ${url.path} + ... ${request_body} + ... server=${CAPIF_HTTPS_URL} + ... verify=ca.crt + ... username=${INVOKER_USERNAME_NEW} + + Check Response Variable Type And Values ${resp} 200 APIInvokerEnrolmentDetails + + Store In File ${INVOKER_USERNAME}.crt ${resp.json()['onboardingInformation']['apiInvokerCertificate']} + + Onboard invoker without supported_features [Tags] capif_api_invoker_management-8 # Default Invoker Registration and Onboarding -- GitLab From 032e16a2c2efef3da631237b574b0a0fa6e4c41e Mon Sep 17 00:00:00 2001 From: Afonso Castanheta Date: Fri, 27 Mar 2026 12:37:17 +0000 Subject: [PATCH 5/6] Revert "Refactor user validation logic to fix fail-open pattern vulnerability" This reverts commit f47bb84388972b5e48bb1fc14f8f223576d98497. --- .../core/validate_user.py | 14 ++--- .../logs/core/validate_user.py | 14 ++--- .../service_apis/core/validate_user.py | 14 ++--- .../capif_events/core/validate_user.py | 22 ++++---- .../api_invocation_logs/core/validate_user.py | 14 ++--- .../published_apis/core/validate_user.py | 52 ++++++++----------- .../capif_security/core/validate_user.py | 14 ++--- .../visibility_control/core/validate_user.py | 26 ++++------ 8 files changed, 64 insertions(+), 106 deletions(-) diff --git a/services/TS29222_CAPIF_API_Provider_Management_API/api_provider_management/core/validate_user.py b/services/TS29222_CAPIF_API_Provider_Management_API/api_provider_management/core/validate_user.py index dac83eb2..fea52c11 100644 --- a/services/TS29222_CAPIF_API_Provider_Management_API/api_provider_management/core/validate_user.py +++ b/services/TS29222_CAPIF_API_Provider_Management_API/api_provider_management/core/validate_user.py @@ -19,15 +19,11 @@ class ControlAccess(Resource): my_query = {'provider_id':api_provider_id, "role": "AMF"} cert_entry = cert_col.find_one(my_query) - if cert_entry is None: - prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found for provider") - prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") - - if cert_entry["cert_signature"] != cert_signature: - prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") - prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") + if cert_entry is not None: + if cert_entry["cert_signature"] != cert_signature: + prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") + prob = serialize_clean_camel_case(prob) + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate amf" diff --git a/services/TS29222_CAPIF_Auditing_API/logs/core/validate_user.py b/services/TS29222_CAPIF_Auditing_API/logs/core/validate_user.py index 68aed77a..545642e1 100644 --- a/services/TS29222_CAPIF_Auditing_API/logs/core/validate_user.py +++ b/services/TS29222_CAPIF_Auditing_API/logs/core/validate_user.py @@ -19,15 +19,11 @@ class ControlAccess(Resource): my_query = {'cert_signature': cert_signature} cert_entry = cert_col.find_one(my_query) - if cert_entry is None: - prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found") - prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") - - if cert_entry["role"] != "AMF": - prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") - prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") + if cert_entry is not None: + if cert_entry["role"] != "AMF": + prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") + prob = serialize_clean_camel_case(prob) + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate invoker" diff --git a/services/TS29222_CAPIF_Discover_Service_API/service_apis/core/validate_user.py b/services/TS29222_CAPIF_Discover_Service_API/service_apis/core/validate_user.py index 6dbd438e..6d2ea40f 100644 --- a/services/TS29222_CAPIF_Discover_Service_API/service_apis/core/validate_user.py +++ b/services/TS29222_CAPIF_Discover_Service_API/service_apis/core/validate_user.py @@ -20,15 +20,11 @@ class ControlAccess(Resource): my_query = {'id': api_invoker_id} cert_entry = cert_col.find_one(my_query) - if cert_entry is None: - prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found for invoker") - prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") - - if cert_entry["cert_signature"] != cert_signature: - prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") - prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") + if cert_entry is not None: + if cert_entry["cert_signature"] != cert_signature: + prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") + prob = serialize_clean_camel_case(prob) + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate invoker" diff --git a/services/TS29222_CAPIF_Events_API/capif_events/core/validate_user.py b/services/TS29222_CAPIF_Events_API/capif_events/core/validate_user.py index 60857a14..8d1f8b05 100644 --- a/services/TS29222_CAPIF_Events_API/capif_events/core/validate_user.py +++ b/services/TS29222_CAPIF_Events_API/capif_events/core/validate_user.py @@ -19,21 +19,17 @@ class ControlAccess(Resource): my_query = {'id':subscriber_id} cert_entry = cert_col.find_one(my_query) - if cert_entry is None: - prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found for subscriber") - prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") + if cert_entry is not None: + if (event_id is None and cert_entry["cert_signature"] != cert_signature): + prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") + prob = serialize_clean_camel_case(prob) - if (event_id is None and cert_entry["cert_signature"] != cert_signature): - prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") - prob = serialize_clean_camel_case(prob) + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") + elif event_id is not None and (cert_entry["cert_signature"] != cert_signature or "event_subscriptions" not in cert_entry["resources"] or event_id not in cert_entry["resources"]["event_subscriptions"]): + prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") + prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") - elif event_id is not None and (cert_entry["cert_signature"] != cert_signature or "event_subscriptions" not in cert_entry["resources"] or event_id not in cert_entry["resources"]["event_subscriptions"]): - prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") - prob = serialize_clean_camel_case(prob) - - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate subscriber" current_app.logger.error(exception + "::" + str(e)) diff --git a/services/TS29222_CAPIF_Logging_API_Invocation_API/api_invocation_logs/core/validate_user.py b/services/TS29222_CAPIF_Logging_API_Invocation_API/api_invocation_logs/core/validate_user.py index 9cce1fa3..f3f5cb6f 100644 --- a/services/TS29222_CAPIF_Logging_API_Invocation_API/api_invocation_logs/core/validate_user.py +++ b/services/TS29222_CAPIF_Logging_API_Invocation_API/api_invocation_logs/core/validate_user.py @@ -19,15 +19,11 @@ class ControlAccess(Resource): my_query = {'id':aef_id} cert_entry = cert_col.find_one(my_query) - if cert_entry is None: - prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found for AEF") - prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") - - if cert_entry["cert_signature"] != cert_signature: - prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") - prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") + if cert_entry is not None: + if cert_entry["cert_signature"] != cert_signature: + prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") + prob = serialize_clean_camel_case(prob) + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate invoker" diff --git a/services/TS29222_CAPIF_Publish_Service_API/published_apis/core/validate_user.py b/services/TS29222_CAPIF_Publish_Service_API/published_apis/core/validate_user.py index 7d2624f0..1782fe04 100644 --- a/services/TS29222_CAPIF_Publish_Service_API/published_apis/core/validate_user.py +++ b/services/TS29222_CAPIF_Publish_Service_API/published_apis/core/validate_user.py @@ -19,39 +19,29 @@ class ControlAccess(Resource): my_query = {'id': apf_id} cert_entry = cert_col.find_one(my_query) - if cert_entry is None: - prob = ProblemDetails( - title="Unauthorized", - detail="User not authorized", - cause="Certificate not found for APF") - prob = serialize_clean_camel_case(prob) - return Response( - json.dumps(prob, cls=CustomJSONEncoder), - status=401, - mimetype="application/json") - - is_user_owner = True - if cert_entry["cert_signature"] != cert_signature: - is_user_owner = False - elif service_id: - if "services" not in cert_entry["resources"]: + if cert_entry is not None: + is_user_owner = True + if cert_entry["cert_signature"] != cert_signature: is_user_owner = False - elif cert_entry.get("resources") and cert_entry["resources"].get("services"): - if service_id not in cert_entry["resources"].get("services"): + elif service_id: + if "services" not in cert_entry["resources"]: is_user_owner = False - if is_user_owner == False: - current_app.logger.info("STEP3") - prob = ProblemDetails( - title="Forbidden", - detail="User not authorized", - cause="You are not the owner of this resource") - current_app.logger.info("STEP4") - prob = serialize_clean_camel_case(prob) - current_app.logger.info("STEP5") - return Response( - json.dumps(prob, cls=CustomJSONEncoder), - status=403, - mimetype="application/json") + elif cert_entry.get("resources") and cert_entry["resources"].get("services"): + if service_id not in cert_entry["resources"].get("services"): + is_user_owner = False + if is_user_owner == False: + current_app.logger.info("STEP3") + prob = ProblemDetails( + title="Unauthorized", + detail="User not authorized", + cause="You are not the owner of this resource") + current_app.logger.info("STEP4") + prob = serialize_clean_camel_case(prob) + current_app.logger.info("STEP5") + return Response( + json.dumps(prob, cls=CustomJSONEncoder), + status=401, + mimetype="application/json") except Exception as e: exception = "An exception occurred in validate apf" diff --git a/services/TS29222_CAPIF_Security_API/capif_security/core/validate_user.py b/services/TS29222_CAPIF_Security_API/capif_security/core/validate_user.py index f4074f17..29180d1a 100644 --- a/services/TS29222_CAPIF_Security_API/capif_security/core/validate_user.py +++ b/services/TS29222_CAPIF_Security_API/capif_security/core/validate_user.py @@ -18,15 +18,11 @@ class ControlAccess(Resource): my_query = {'id':invoker_id} cert_entry = cert_col.find_one(my_query) - if cert_entry is None: - prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found for invoker") - prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") - - if cert_entry["cert_signature"] != cert_signature: - prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") - prob = serialize_clean_camel_case(prob) - return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") + if cert_entry is not None: + if cert_entry["cert_signature"] != cert_signature: + prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") + prob = serialize_clean_camel_case(prob) + return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate invoker" diff --git a/services/helper/helper_service/services/visibility_control/core/validate_user.py b/services/helper/helper_service/services/visibility_control/core/validate_user.py index a4dbb59d..750bb9d5 100644 --- a/services/helper/helper_service/services/visibility_control/core/validate_user.py +++ b/services/helper/helper_service/services/visibility_control/core/validate_user.py @@ -16,23 +16,15 @@ class ControlAccess(Resource): my_query = {'provider_id': api_provider_id} cert_entry = cert_col.find_one(my_query) - if cert_entry is None: - # Explicit denial when certificate not found - prob = { - "title": "Unauthorized", - "detail": "User not authorized", - "cause": "Certificate not found for provider" - } - return Response(json.dumps(prob), status=401, mimetype="application/json") - - if cert_entry["cert_signature"] != cert_signature: - # Return 403 if signatures don't match - prob = { - "title": "Forbidden", - "detail": "User not authorized", - "cause": "You are not the owner of this resource" - } - return Response(json.dumps(prob), status=403, mimetype="application/json") + if cert_entry is not None: + if cert_entry["cert_signature"] != cert_signature: + # Return 401 if signatures don't match + prob = { + "title": "Unauthorized", + "detail": "User not authorized", + "cause": "You are not the owner of this resource" + } + return Response(json.dumps(prob), status=401, mimetype="application/json") return None except Exception as e: current_app.logger.error("Error in validate_user_cert: " + str(e)) -- GitLab From ae7c1c424b1842f63f45fa18213eba5cc22ef203 Mon Sep 17 00:00:00 2001 From: Afonso Castanheta Date: Mon, 30 Mar 2026 21:55:42 +0000 Subject: [PATCH 6/6] Change identifier in Invoker Management for consistency with other services. --- .../api_invoker_management/core/auth_manager.py | 6 +++--- .../api_invoker_management/core/validate_user.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/auth_manager.py b/services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/auth_manager.py index 24290bfa..4614d64f 100644 --- a/services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/auth_manager.py +++ b/services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/auth_manager.py @@ -10,15 +10,15 @@ class AuthManager(Resource): cert_col = self.db.get_col_by_name(self.db.certs_col) cert_encode = x509.load_pem_x509_certificate(str.encode(cert), default_backend()) - cert_col.insert_one({"cert_signature":cert_encode.signature.hex(), "role":"invoker", "invoker_id": invoker_id, "resources": {}}) + cert_col.insert_one({"cert_signature":cert_encode.signature.hex(), "role":"invoker", "id": invoker_id, "resources": {}}) def update_auth_invoker(self, cert, invoker_id): cert_col = self.db.get_col_by_name(self.db.certs_col) cert_encode = x509.load_pem_x509_certificate(str.encode(cert), default_backend()) - cert_col.find_one_and_update({"invoker_id":invoker_id}, {"$set":{"cert_signature":cert_encode.signature.hex()}}) + cert_col.find_one_and_update({"id":invoker_id}, {"$set":{"cert_signature":cert_encode.signature.hex()}}) def remove_auth_invoker(self, invoker_id): cert_col = self.db.get_col_by_name(self.db.certs_col) - cert_col.delete_one({"invoker_id":invoker_id}) + cert_col.delete_one({"id":invoker_id}) diff --git a/services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/validate_user.py b/services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/validate_user.py index a36cdd0c..6e7aeb41 100644 --- a/services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/validate_user.py +++ b/services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/validate_user.py @@ -16,7 +16,7 @@ class ControlAccess(Resource): cert_col = self.db.get_col_by_name(self.db.certs_col) try: - my_query = {'invoker_id':api_invoker_id} + my_query = {'id':api_invoker_id} cert_entry = cert_col.find_one(my_query) if cert_entry is None: -- GitLab