Loading services/TS29222_CAPIF_API_Provider_Management_API/api_provider_management/core/validate_user.py +5 −9 Original line number Diff line number Diff line Loading @@ -19,15 +19,11 @@ class ControlAccess(Resource): my_query = {'provider_id':api_provider_id, "role": "AMF"} cert_entry = cert_col.find_one(my_query) if cert_entry is None: prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found for provider") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") if cert_entry is not None: if cert_entry["cert_signature"] != cert_signature: prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate amf" Loading services/TS29222_CAPIF_Auditing_API/logs/core/validate_user.py +5 −9 Original line number Diff line number Diff line Loading @@ -19,15 +19,11 @@ class ControlAccess(Resource): my_query = {'cert_signature': cert_signature} cert_entry = cert_col.find_one(my_query) if cert_entry is None: prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") if cert_entry is not None: if cert_entry["role"] != "AMF": prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate invoker" Loading services/TS29222_CAPIF_Discover_Service_API/service_apis/core/validate_user.py +5 −9 Original line number Diff line number Diff line Loading @@ -20,15 +20,11 @@ class ControlAccess(Resource): my_query = {'id': api_invoker_id} cert_entry = cert_col.find_one(my_query) if cert_entry is None: prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found for invoker") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") if cert_entry is not None: if cert_entry["cert_signature"] != cert_signature: prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate invoker" Loading services/TS29222_CAPIF_Events_API/capif_events/core/validate_user.py +9 −13 Original line number Diff line number Diff line Loading @@ -19,21 +19,17 @@ class ControlAccess(Resource): my_query = {'id':subscriber_id} cert_entry = cert_col.find_one(my_query) if cert_entry is None: prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found for subscriber") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") if cert_entry is not None: if (event_id is None and cert_entry["cert_signature"] != cert_signature): prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") elif event_id is not None and (cert_entry["cert_signature"] != cert_signature or "event_subscriptions" not in cert_entry["resources"] or event_id not in cert_entry["resources"]["event_subscriptions"]): prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate subscriber" current_app.logger.error(exception + "::" + str(e)) Loading services/TS29222_CAPIF_Logging_API_Invocation_API/api_invocation_logs/core/validate_user.py +5 −9 Original line number Diff line number Diff line Loading @@ -19,15 +19,11 @@ class ControlAccess(Resource): my_query = {'id':aef_id} cert_entry = cert_col.find_one(my_query) if cert_entry is None: prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found for AEF") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") if cert_entry is not None: if cert_entry["cert_signature"] != cert_signature: prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate invoker" Loading Loading
services/TS29222_CAPIF_API_Provider_Management_API/api_provider_management/core/validate_user.py +5 −9 Original line number Diff line number Diff line Loading @@ -19,15 +19,11 @@ class ControlAccess(Resource): my_query = {'provider_id':api_provider_id, "role": "AMF"} cert_entry = cert_col.find_one(my_query) if cert_entry is None: prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found for provider") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") if cert_entry is not None: if cert_entry["cert_signature"] != cert_signature: prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate amf" Loading
services/TS29222_CAPIF_Auditing_API/logs/core/validate_user.py +5 −9 Original line number Diff line number Diff line Loading @@ -19,15 +19,11 @@ class ControlAccess(Resource): my_query = {'cert_signature': cert_signature} cert_entry = cert_col.find_one(my_query) if cert_entry is None: prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") if cert_entry is not None: if cert_entry["role"] != "AMF": prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate invoker" Loading
services/TS29222_CAPIF_Discover_Service_API/service_apis/core/validate_user.py +5 −9 Original line number Diff line number Diff line Loading @@ -20,15 +20,11 @@ class ControlAccess(Resource): my_query = {'id': api_invoker_id} cert_entry = cert_col.find_one(my_query) if cert_entry is None: prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found for invoker") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") if cert_entry is not None: if cert_entry["cert_signature"] != cert_signature: prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate invoker" Loading
services/TS29222_CAPIF_Events_API/capif_events/core/validate_user.py +9 −13 Original line number Diff line number Diff line Loading @@ -19,21 +19,17 @@ class ControlAccess(Resource): my_query = {'id':subscriber_id} cert_entry = cert_col.find_one(my_query) if cert_entry is None: prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found for subscriber") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") if cert_entry is not None: if (event_id is None and cert_entry["cert_signature"] != cert_signature): prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") elif event_id is not None and (cert_entry["cert_signature"] != cert_signature or "event_subscriptions" not in cert_entry["resources"] or event_id not in cert_entry["resources"]["event_subscriptions"]): prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate subscriber" current_app.logger.error(exception + "::" + str(e)) Loading
services/TS29222_CAPIF_Logging_API_Invocation_API/api_invocation_logs/core/validate_user.py +5 −9 Original line number Diff line number Diff line Loading @@ -19,15 +19,11 @@ class ControlAccess(Resource): my_query = {'id':aef_id} cert_entry = cert_col.find_one(my_query) if cert_entry is None: prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="Certificate not found for AEF") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") if cert_entry is not None: if cert_entry["cert_signature"] != cert_signature: prob = ProblemDetails(title="Forbidden", detail="User not authorized", cause="You are not the owner of this resource") prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=403, mimetype="application/json") return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate invoker" Loading