Loading services/TS29222_CAPIF_Events_API/capif_events/controllers/default_controller.py +1 −1 Original line number Diff line number Diff line Loading @@ -30,7 +30,7 @@ def cert_validation(): if request.method != 'POST': result = valid_user.validate_user_cert(args["subscriptionId"], args["subscriberId"], cert_signature) else: result = valid_user.validate_user_cert_post(args["subscriberId"], cert_signature) result = valid_user.validate_user_cert(None, args["subscriberId"], cert_signature) if result is not None: return result Loading services/TS29222_CAPIF_Events_API/capif_events/core/validate_user.py +8 −24 Original line number Diff line number Diff line Loading @@ -20,32 +20,16 @@ class ControlAccess(Resource): cert_entry = cert_col.find_one(my_query) if cert_entry is not None: if cert_entry["cert_signature"] != cert_signature or "event_subscriptions" not in cert_entry["resources"] or event_id not in cert_entry["resources"]["event_subscriptions"]: if (event_id is None and cert_entry["cert_signature"] != cert_signature): prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate subscriber" current_app.logger.error(exception + "::" + str(e)) return internal_server_error(detail=exception, cause=str(e)) def validate_user_cert_post(self, subscriber_id, cert_signature): cert_col = self.db.get_col_by_name(self.db.certs_col) try: my_query = {'id':subscriber_id} cert_entry = cert_col.find_one(my_query) if cert_entry is not None: if cert_entry["cert_signature"] != cert_signature: elif event_id is not None and (cert_entry["cert_signature"] != cert_signature or "event_subscriptions" not in cert_entry["resources"] or event_id not in cert_entry["resources"]["event_subscriptions"]): prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate subscriber" current_app.logger.error(exception + "::" + str(e)) Loading Loading
services/TS29222_CAPIF_Events_API/capif_events/controllers/default_controller.py +1 −1 Original line number Diff line number Diff line Loading @@ -30,7 +30,7 @@ def cert_validation(): if request.method != 'POST': result = valid_user.validate_user_cert(args["subscriptionId"], args["subscriberId"], cert_signature) else: result = valid_user.validate_user_cert_post(args["subscriberId"], cert_signature) result = valid_user.validate_user_cert(None, args["subscriberId"], cert_signature) if result is not None: return result Loading
services/TS29222_CAPIF_Events_API/capif_events/core/validate_user.py +8 −24 Original line number Diff line number Diff line Loading @@ -20,32 +20,16 @@ class ControlAccess(Resource): cert_entry = cert_col.find_one(my_query) if cert_entry is not None: if cert_entry["cert_signature"] != cert_signature or "event_subscriptions" not in cert_entry["resources"] or event_id not in cert_entry["resources"]["event_subscriptions"]: if (event_id is None and cert_entry["cert_signature"] != cert_signature): prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate subscriber" current_app.logger.error(exception + "::" + str(e)) return internal_server_error(detail=exception, cause=str(e)) def validate_user_cert_post(self, subscriber_id, cert_signature): cert_col = self.db.get_col_by_name(self.db.certs_col) try: my_query = {'id':subscriber_id} cert_entry = cert_col.find_one(my_query) if cert_entry is not None: if cert_entry["cert_signature"] != cert_signature: elif event_id is not None and (cert_entry["cert_signature"] != cert_signature or "event_subscriptions" not in cert_entry["resources"] or event_id not in cert_entry["resources"]["event_subscriptions"]): prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate subscriber" current_app.logger.error(exception + "::" + str(e)) Loading