Loading .gitignore +5 −1 Original line number Original line Diff line number Diff line Loading @@ -37,4 +37,8 @@ results helm/capif/*.lock helm/capif/*.lock helm/capif/charts/tempo* helm/capif/charts/tempo* *.bakresults/ .venv/ tests/features/Helper/Visibility Control Api/*.html tests/features/Helper/Visibility Control Api/*.xml *.bak *.bak services/helper/helper_service/openapi_helper_visibility_control.yaml +3 −3 Original line number Original line Diff line number Diff line Loading @@ -55,7 +55,7 @@ paths: allow_except_some_invokers: allow_except_some_invokers: value: value: providerSelector: providerSelector: createdByUser: "userA" userName: "userA" apiProviderId: [ "capif-prov-01", "capif-prov-02" ] apiProviderId: [ "capif-prov-01", "capif-prov-02" ] apiName: [ "apiName-001" ] apiName: [ "apiName-001" ] apiId: [ "apiId-001" ] apiId: [ "apiId-001" ] Loading Loading @@ -293,9 +293,9 @@ components: Provider-side selector. Arrays apply OR within the field; AND across fields. Provider-side selector. Arrays apply OR within the field; AND across fields. At least one of these fields must be present. At least one of these fields must be present. required: required: - createdByUser - userName properties: properties: createdByUser: userName: type: string type: string minLength: 1 minLength: 1 apiProviderId: apiProviderId: Loading services/helper/helper_service/services/visibility_control/auth.py +0 −27 Original line number Original line Diff line number Diff line Loading @@ -34,30 +34,3 @@ def cert_validation(): return __cert_validation return __cert_validation return _cert_validation return _cert_validation # def cert_validation(): # def _cert_validation(f): # @wraps(f) # def __cert_validation(*args, **kwargs): # # 1. Validación de existencia de certificado (Literal del ejemplo) # cert_tmp = request.headers['X-Ssl-Client-Cert'] # cert_raw = cert_tmp.replace('\\t', '') # cert = x509.load_pem_x509_certificate(str.encode(cert_raw), default_backend()) # cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0].value.strip() # # Guardamos para que el Core los vea # request.user_cn = cn # request.cert_signature = cert.signature.hex() # if cn != "superadmin": # cert_signature = cert.signature.hex() # # Aquí, si hay un ID en la URL (como rule_id), podríamos validar. # # Pero para el POST, como el ID está en el cuerpo, # # simplemente dejamos que el Core lo gestione para no romper el wrap. # # Si quieres que el wrap sea universal, lo dejamos pasar tras validar el CN # pass # return f(**kwargs) # return __cert_validation # return _cert_validation No newline at end of file services/helper/helper_service/services/visibility_control/core/visibility_control_core.py +233 −51 Original line number Original line Diff line number Diff line Loading @@ -8,23 +8,63 @@ from visibility_control.core.validate_user import ControlAccess valid_user = ControlAccess() valid_user = ControlAccess() # def get_all_rules(): # db = get_mongo() # # Usamos la colección configurada en el helper # col = db.get_col_by_name("visibility_rules") # rules = list(col.find({}, {"_id": 0})) # return {"rules": rules}, 200 # def get_all_rules(): # """ # Retrieve visibility rules. # - If superadmin: Returns all rules without filtering. # - If provider: Returns only rules matching the certificate identity (CN). # """ # db = get_mongo() # col = db.get_col_by_name("visibility_rules") # # Safely retrieve identity from request object # cn = request.user_cn # # Professional logic tree: # # 1. Bypass security filters for Superadmin # if cn != "superadmin": # rules = list(col.find({"userName": cn}, {"_id": 0})) # return {"rules": rules}, 200 # rules = list(col.find({}, {"_id": 0})) # return {"rules": rules}, 200 def get_all_rules(): def get_all_rules(): db = get_mongo() db = get_mongo() # Usamos la colección configurada en el helper col = db.get_col_by_name("visibility_rules") col = db.get_col_by_name("visibility_rules") rules = list(col.find({}, {"_id": 0})) return {"rules": rules}, 200 # def create_new_rule(body): # The ID from the certificate (e.g., AMFe9d24...) # db = get_mongo() cn = getattr(request, 'user_cn', None) # col = db.get_col_by_name("visibility_rules") # # Generamos el ruleId (Server generates it, según tu comentario) # 1. Superadmin: No filters, returns everything # body['ruleId'] = str(uuid.uuid4()) if cn != "superadmin": # We look into CAPIF's provider registration to find the friendly name # assigned to this specific Certificate ID (CN) prov_col = db.get_col_by_name("provider_details") provider = prov_col.find_one({"apiProvFuncs.apiProvFuncId": cn}) friendly_name = provider.get('userName') if provider else None # The query uses an $or operator to ensure visibility: # - Rules where I am the owner (friendly_name) # - Rules I created or updated myself (cn) query_conditions = [{"updatedBy": cn}] if friendly_name: query_conditions.append({"providerSelector.userName": friendly_name}) rules = list(col.find({"$or": query_conditions}, {"_id": 0})) return {"rules": rules}, 200 rules = list(col.find({}, {"_id": 0})) return {"rules": rules}, 200 # col.insert_one(body) # body.pop('_id', None) # return body, 201 def create_new_rule(body): def create_new_rule(body): db = get_mongo() db = get_mongo() Loading @@ -40,20 +80,20 @@ def create_new_rule(body): # We check apiProviderId if it exists, but we focus on the mandatory identity # We check apiProviderId if it exists, but we focus on the mandatory identity api_id = ps.get('apiProviderId', [None])[0] api_id = ps.get('apiProviderId', [None])[0] created_by = ps.get('createdByUser') user_name = ps.get('userName') # Use the available ID to validate ownership via certificate signature # Use the available ID to validate ownership via certificate signature # We prioritize apiProviderId, then createdByUser as fallback for validation # We prioritize apiProviderId, then userName as fallback for validation user_to_validate = api_id if api_id else created_by user_to_validate = api_id if api_id else user_name if user_to_validate: if user_to_validate: result = valid_user.validate_user_cert(user_to_validate, cert_sig) result = valid_user.validate_user_cert(user_to_validate, cert_sig) if result is not None: if result is not None: return result return result else: else: # If even createdByUser is missing (despite being mandatory in your logic), # If even userName is missing (despite being mandatory in your logic), # we block it or handle it as a Bad Request # we block it or handle it as a Bad Request return {"title": "Bad Request", "detail": "createdByUser is mandatory"}, 400 return {"title": "Bad Request", "detail": "userName is mandatory"}, 400 # 1. Generate a unique ruleId # 1. Generate a unique ruleId body['ruleId'] = str(uuid.uuid4()) body['ruleId'] = str(uuid.uuid4()) Loading Loading @@ -96,10 +136,7 @@ def create_new_rule(body): "detail": "Invalid endsAt format." "detail": "Invalid endsAt format." }, 400 }, 400 # 6. Set 'updatedBy' metadata from providerSelector body['createdBy'] = cn # ps = body.get('providerSelector', {}) # body['updatedBy'] = ps.get('createdByUser', 'system') # This prevents a user from putting "User_A" in the JSON while using "User_B" certificate body['updatedBy'] = cn body['updatedBy'] = cn # Save to MongoDB # Save to MongoDB Loading @@ -111,69 +148,214 @@ def create_new_rule(body): return body, 201 return body, 201 def get_rule_by_id(rule_id): def get_rule_by_id(rule_id): """ Retrieve a specific visibility rule by its ID. - Superadmin: Can view any rule. - Providers: Can view rules they own (userName) or created (updatedBy). """ db = get_mongo() db = get_mongo() col = db.get_col_by_name("visibility_rules") col = db.get_col_by_name("visibility_rules") cn = request.user_cn # 1. Fetch the rule from the database # We exclude the MongoDB internal _id field immediately rule = col.find_one({"ruleId": rule_id}, {"_id": 0}) rule = col.find_one({"ruleId": rule_id}, {"_id": 0}) # if rule: # return rule, 200 # return {"title": "Not Found", "detail": "Rule not found"}, 404 if not rule: if not rule: return {"title": "Not Found", "detail": "Rule not found"}, 404 return {"title": "Not Found", "detail": "Rule not found"}, 404 # SECURITY CHECK: Only Superadmin or the actual owner (CN) can view # 2. Authorization Check: Superadmin bypass if request.user_cn != "superadmin" and rule.get('updatedBy') != request.user_cn: if cn == "superadmin": return {"title": "Unauthorized", "detail": "You do not have permission to view this rule"}, 401 return rule, 200 # 3. Identity Translation for Providers # Link the certificate CN to the registered Friendly Username prov_col = db.get_col_by_name("provider_details") provider = prov_col.find_one({"apiProvFuncs.apiProvFuncId": cn}) friendly_name = provider.get('userName') if provider else None # 4. Permission Validation # is_owner: Checks the logical owner in the rule (userName) # is_creator: Checks the cryptographic signer (updatedBy) is_owner = rule.get('providerSelector', {}).get('userName') == friendly_name is_creator = rule.get('updatedBy') == cn rule.pop('_id', None) if is_owner or is_creator: return rule, 200 return rule, 200 # 5. Deny access if the requester is neither the owner nor the creator return { "title": "Unauthorized", "detail": "You do not have permission to view this rule" }, 401 # def get_rule_by_id(rule_id): # db = get_mongo() # col = db.get_col_by_name("visibility_rules") # cn = request.user_cn # rule = col.find_one({"ruleId": rule_id}, {"_id": 0}) # # if rule: # # return rule, 200 # # return {"title": "Not Found", "detail": "Rule not found"}, 404 # if not rule: # return {"title": "Not Found", "detail": "Rule not found"}, 404 # # SECURITY CHECK: Only Superadmin or the actual owner (CN) can view # if request.user_cn != "superadmin" and rule.get('updatedBy') != request.user_cn: # return {"title": "Unauthorized", "detail": "You do not have permission to view this rule"}, 401 # rule.pop('_id', None) # return rule, 200 # def delete_rule_by_id(rule_id): # db = get_mongo() # col = db.get_col_by_name("visibility_rules") # cn = request.user_cn # # 1. Fetch the rule first to check ownership # rule = col.find_one({"ruleId": rule_id}) # if not rule: # return {"title": "Not Found", "detail": "Rule not found"}, 404 # # 2. SECURITY CHECK: Only Superadmin or the actual owner (CN) can delete # # We compare the current certificate CN against the stored 'updatedBy' # if request.user_cn != "superadmin" and rule.get('userName') != request.user_cn: # return {"title": "Unauthorized", "detail": "You do not have permission to delete this rule"}, 401 # # 3. Perform the actual deletion # res = col.delete_one({"ruleId": rule_id}) # # 4. Final check on the operation result # if res.deleted_count > 0: # return None, 204 # return {"title": "Not Found", "detail": "Rule not found"}, 404 def delete_rule_by_id(rule_id): def delete_rule_by_id(rule_id): """ Delete a specific visibility rule after verifying ownership. - Superadmin: Can delete any rule. - Providers: Can only delete rules assigned to them or created by them. """ db = get_mongo() db = get_mongo() col = db.get_col_by_name("visibility_rules") col = db.get_col_by_name("visibility_rules") # 1. Fetch the rule first to check ownership cn = request.user_cn # 1. Retrieve the rule to check metadata rule = col.find_one({"ruleId": rule_id}) rule = col.find_one({"ruleId": rule_id}) if not rule: if not rule: return {"title": "Not Found", "detail": "Rule not found"}, 404 return {"title": "Not Found", "detail": "Rule not found"}, 404 # 2. SECURITY CHECK: Only Superadmin or the actual owner (CN) can delete # 2. Authorization Check: Superadmin bypass # We compare the current certificate CN against the stored 'updatedBy' if cn == "superadmin": if request.user_cn != "superadmin" and rule.get('updatedBy') != request.user_cn: col.delete_one({"ruleId": rule_id}) return {"title": "Unauthorized", "detail": "You do not have permission to delete this rule"}, 401 return None, 204 # 3. Identity Translation for Providers # Resolve the certificate ID to the registered friendly username prov_col = db.get_col_by_name("provider_details") provider = prov_col.find_one({"apiProvFuncs.apiProvFuncId": cn}) friendly_name = provider.get('userName') if provider else None # 3. Perform the actual deletion # 4. Permissions Validation res = col.delete_one({"ruleId": rule_id}) # is_owner: Checks if the rule's userName matches the provider's registered name. # is_creator: Checks if the rule was signed by the current certificate ID. is_owner = rule.get('providerSelector', {}).get('userName') == friendly_name is_creator = rule.get('updatedBy') == cn # 4. Final check on the operation result if is_owner or is_creator: res = col.delete_one({"ruleId": rule_id}) if res.deleted_count > 0: if res.deleted_count > 0: return None, 204 return None, 204 return {"title": "Not Found", "detail": "Rule not found"}, 404 # 5. Deny access if no ownership is proven return { "title": "Unauthorized", "detail": "You do not have permission to delete this rule" }, 401 # def update_rule_patch(rule_id, body): # def update_rule_patch(rule_id, body): # db = get_mongo() # db = get_mongo() # col = db.get_col_by_name("visibility_rules") # col = db.get_col_by_name("visibility_rules") # cn = request.user_cn # # Fetch existing rule for comparison # existing_rule = col.find_one({"ruleId": rule_id}) # if not existing_rule: # return {"title": "Not Found", "detail": "Rule not found"}, 404 # # SECURITY CHECK: Only Superadmin or the actual owner (CN) can update # if request.user_cn != "superadmin" and existing_rule.get('userName') != request.user_cn: # return {"title": "Unauthorized", "detail": "You do not have permission to modify this rule"}, 401 # # Always update 'updatedAt' timestamp # now = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') # body['updatedAt'] = now # # Re-validate date logic if dates are being modified # new_start = body.get('startsAt', existing_rule.get('startsAt')) # new_end = body.get('endsAt', existing_rule.get('endsAt')) # if new_start and new_end: # try: # s = datetime.fromisoformat(new_start.replace('Z', '+00:00')) # e = datetime.fromisoformat(new_end.replace('Z', '+00:00')) # if e <= s: # return { # "title": "Bad Request", # "detail": "Validation Error: endsAt must be later than startsAt" # }, 400 # except ValueError: # return {"title": "Bad Request", "detail": "Invalid date format."}, 400 # # # Update metadata if user info is provided # # if 'providerSelector' in body and 'createdByUser' in body['providerSelector']: # # body['updatedBy'] = body['providerSelector']['createdByUser'] # body['updatedBy'] = cn # col.update_one({"ruleId": rule_id}, {"$set": body}) # col.update_one({"ruleId": rule_id}, {"$set": body}) # updated_rule = col.find_one({"ruleId": rule_id}, {"_id": 0}) # updated_rule = col.find_one({"ruleId": rule_id}, {"_id": 0}) # return updated_rule, 200 # return updated_rule, 200 def update_rule_patch(rule_id, body): def update_rule_patch(rule_id, body): """ Update a specific visibility rule using PATCH logic. - Superadmin: Can modify any rule. - Providers: Can only modify rules they own or created. """ db = get_mongo() db = get_mongo() col = db.get_col_by_name("visibility_rules") col = db.get_col_by_name("visibility_rules") cn = request.user_cn # Fetch existing rule for comparison # 1. Fetch existing rule to verify existence and check ownership existing_rule = col.find_one({"ruleId": rule_id}) existing_rule = col.find_one({"ruleId": rule_id}) if not existing_rule: if not existing_rule: return {"title": "Not Found", "detail": "Rule not found"}, 404 return {"title": "Not Found", "detail": "Rule not found"}, 404 # SECURITY CHECK: Only Superadmin or the actual owner (CN) can update # 2. Authorization Check: Superadmin bypass if request.user_cn != "superadmin" and existing_rule.get('updatedBy') != request.user_cn: if cn != "superadmin": return {"title": "Unauthorized", "detail": "You do not have permission to modify this rule"}, 401 # Resolve Certificate CN to the registered Friendly Username prov_col = db.get_col_by_name("provider_details") provider = prov_col.find_one({"apiProvFuncs.apiProvFuncId": cn}) friendly_name = provider.get('userName') if provider else None # Ownership Validation: # - Check if the rule's userName matches the provider's registered name # - OR check if the rule was last updated/created by this specific certificate is_owner = existing_rule.get('providerSelector', {}).get('userName') == friendly_name is_creator = existing_rule.get('updatedBy') == cn if not (is_owner or is_creator): return { "title": "Unauthorized", "detail": "You do not have permission to modify this rule" }, 401 # Always update 'updatedAt' timestamp # 3. Metadata Updates now = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') now = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') body['updatedAt'] = now body['updatedAt'] = now body['updatedBy'] = cn # Track who performed the update # Re-validate date logic if dates are being modified # 4. Date Logic Validation # Ensure startsAt is earlier than endsAt, even if only one is being updated new_start = body.get('startsAt', existing_rule.get('startsAt')) new_start = body.get('startsAt', existing_rule.get('startsAt')) new_end = body.get('endsAt', existing_rule.get('endsAt')) new_end = body.get('endsAt', existing_rule.get('endsAt')) Loading @@ -189,10 +371,10 @@ def update_rule_patch(rule_id, body): except ValueError: except ValueError: return {"title": "Bad Request", "detail": "Invalid date format."}, 400 return {"title": "Bad Request", "detail": "Invalid date format."}, 400 # Update metadata if user info is provided # 5. Apply changes to Database if 'providerSelector' in body and 'createdByUser' in body['providerSelector']: # We use $set to only modify the fields provided in the PATCH body body['updatedBy'] = body['providerSelector']['createdByUser'] col.update_one({"ruleId": rule_id}, {"$set": body}) col.update_one({"ruleId": rule_id}, {"$set": body}) # Return the fully updated object (excluding Mongo's internal _id) updated_rule = col.find_one({"ruleId": rule_id}, {"_id": 0}) updated_rule = col.find_one({"ruleId": rule_id}, {"_id": 0}) return updated_rule, 200 return updated_rule, 200 No newline at end of file services/helper/helper_service/services/visibility_control/openapi/openapi.yaml +10 −10 Original line number Original line Diff line number Diff line Loading @@ -6,7 +6,7 @@ info: (discovery) and whether invokers are allowed to create a security context to access them. (discovery) and whether invokers are allowed to create a security context to access them. - Rules are global and evaluated with "more specific wins" precedence. - Rules are global and evaluated with "more specific wins" precedence. - If no rule matches, the decision uses OpenCAPIF's global default (outside this API). - If no rule matches, the decision uses OpenCAPIF's global default (outside this API). - Provider selector is mandatory in rules and must contain at least one selector field (createdByUser is mandatory). - Provider selector is mandatory in rules and must contain at least one selector field (userName is mandatory). title: OpenCAPIF Visibility Control title: OpenCAPIF Visibility Control version: 1.0.0 version: 1.0.0 # servers: # servers: Loading Loading @@ -88,7 +88,7 @@ paths: allow_except_some_invokers: allow_except_some_invokers: value: value: providerSelector: providerSelector: createdByUser: userA userName: userA apiProviderId: apiProviderId: - capif-prov-01 - capif-prov-01 - capif-prov-02 - capif-prov-02 Loading Loading @@ -260,7 +260,7 @@ components: apiName: apiName: - apiName - apiName - apiName - apiName createdByUser: createdByUser userName: userName aefId: aefId: - aefId - aefId - aefId - aefId Loading Loading @@ -375,7 +375,7 @@ components: apiName: apiName: - apiName - apiName - apiName - apiName createdByUser: createdByUser userName: userName aefId: aefId: - aefId - aefId - aefId - aefId Loading Loading @@ -488,7 +488,7 @@ components: apiName: apiName: - apiName - apiName - apiName - apiName createdByUser: createdByUser userName: userName aefId: aefId: - aefId - aefId - aefId - aefId Loading @@ -496,9 +496,9 @@ components: - apiId - apiId - apiId - apiId properties: properties: createdByUser: userName: minLength: 1 minLength: 1 title: createdByUser title: userName type: string type: string apiProviderId: apiProviderId: items: items: Loading Loading @@ -529,7 +529,7 @@ components: type: array type: array uniqueItems: true uniqueItems: true required: required: - createdByUser - userName title: ProviderSelector title: ProviderSelector type: object type: object InvokerSelector: InvokerSelector: Loading Loading @@ -1876,7 +1876,7 @@ components: apiName: apiName: - apiName - apiName - apiName - apiName createdByUser: createdByUser userName: userName aefId: aefId: - aefId - aefId - aefId - aefId Loading Loading @@ -1905,7 +1905,7 @@ components: apiName: apiName: - apiName - apiName - apiName - apiName createdByUser: createdByUser userName: userName aefId: aefId: - aefId - aefId - aefId - aefId Loading Loading
.gitignore +5 −1 Original line number Original line Diff line number Diff line Loading @@ -37,4 +37,8 @@ results helm/capif/*.lock helm/capif/*.lock helm/capif/charts/tempo* helm/capif/charts/tempo* *.bakresults/ .venv/ tests/features/Helper/Visibility Control Api/*.html tests/features/Helper/Visibility Control Api/*.xml *.bak *.bak
services/helper/helper_service/openapi_helper_visibility_control.yaml +3 −3 Original line number Original line Diff line number Diff line Loading @@ -55,7 +55,7 @@ paths: allow_except_some_invokers: allow_except_some_invokers: value: value: providerSelector: providerSelector: createdByUser: "userA" userName: "userA" apiProviderId: [ "capif-prov-01", "capif-prov-02" ] apiProviderId: [ "capif-prov-01", "capif-prov-02" ] apiName: [ "apiName-001" ] apiName: [ "apiName-001" ] apiId: [ "apiId-001" ] apiId: [ "apiId-001" ] Loading Loading @@ -293,9 +293,9 @@ components: Provider-side selector. Arrays apply OR within the field; AND across fields. Provider-side selector. Arrays apply OR within the field; AND across fields. At least one of these fields must be present. At least one of these fields must be present. required: required: - createdByUser - userName properties: properties: createdByUser: userName: type: string type: string minLength: 1 minLength: 1 apiProviderId: apiProviderId: Loading
services/helper/helper_service/services/visibility_control/auth.py +0 −27 Original line number Original line Diff line number Diff line Loading @@ -34,30 +34,3 @@ def cert_validation(): return __cert_validation return __cert_validation return _cert_validation return _cert_validation # def cert_validation(): # def _cert_validation(f): # @wraps(f) # def __cert_validation(*args, **kwargs): # # 1. Validación de existencia de certificado (Literal del ejemplo) # cert_tmp = request.headers['X-Ssl-Client-Cert'] # cert_raw = cert_tmp.replace('\\t', '') # cert = x509.load_pem_x509_certificate(str.encode(cert_raw), default_backend()) # cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0].value.strip() # # Guardamos para que el Core los vea # request.user_cn = cn # request.cert_signature = cert.signature.hex() # if cn != "superadmin": # cert_signature = cert.signature.hex() # # Aquí, si hay un ID en la URL (como rule_id), podríamos validar. # # Pero para el POST, como el ID está en el cuerpo, # # simplemente dejamos que el Core lo gestione para no romper el wrap. # # Si quieres que el wrap sea universal, lo dejamos pasar tras validar el CN # pass # return f(**kwargs) # return __cert_validation # return _cert_validation No newline at end of file
services/helper/helper_service/services/visibility_control/core/visibility_control_core.py +233 −51 Original line number Original line Diff line number Diff line Loading @@ -8,23 +8,63 @@ from visibility_control.core.validate_user import ControlAccess valid_user = ControlAccess() valid_user = ControlAccess() # def get_all_rules(): # db = get_mongo() # # Usamos la colección configurada en el helper # col = db.get_col_by_name("visibility_rules") # rules = list(col.find({}, {"_id": 0})) # return {"rules": rules}, 200 # def get_all_rules(): # """ # Retrieve visibility rules. # - If superadmin: Returns all rules without filtering. # - If provider: Returns only rules matching the certificate identity (CN). # """ # db = get_mongo() # col = db.get_col_by_name("visibility_rules") # # Safely retrieve identity from request object # cn = request.user_cn # # Professional logic tree: # # 1. Bypass security filters for Superadmin # if cn != "superadmin": # rules = list(col.find({"userName": cn}, {"_id": 0})) # return {"rules": rules}, 200 # rules = list(col.find({}, {"_id": 0})) # return {"rules": rules}, 200 def get_all_rules(): def get_all_rules(): db = get_mongo() db = get_mongo() # Usamos la colección configurada en el helper col = db.get_col_by_name("visibility_rules") col = db.get_col_by_name("visibility_rules") rules = list(col.find({}, {"_id": 0})) return {"rules": rules}, 200 # def create_new_rule(body): # The ID from the certificate (e.g., AMFe9d24...) # db = get_mongo() cn = getattr(request, 'user_cn', None) # col = db.get_col_by_name("visibility_rules") # # Generamos el ruleId (Server generates it, según tu comentario) # 1. Superadmin: No filters, returns everything # body['ruleId'] = str(uuid.uuid4()) if cn != "superadmin": # We look into CAPIF's provider registration to find the friendly name # assigned to this specific Certificate ID (CN) prov_col = db.get_col_by_name("provider_details") provider = prov_col.find_one({"apiProvFuncs.apiProvFuncId": cn}) friendly_name = provider.get('userName') if provider else None # The query uses an $or operator to ensure visibility: # - Rules where I am the owner (friendly_name) # - Rules I created or updated myself (cn) query_conditions = [{"updatedBy": cn}] if friendly_name: query_conditions.append({"providerSelector.userName": friendly_name}) rules = list(col.find({"$or": query_conditions}, {"_id": 0})) return {"rules": rules}, 200 rules = list(col.find({}, {"_id": 0})) return {"rules": rules}, 200 # col.insert_one(body) # body.pop('_id', None) # return body, 201 def create_new_rule(body): def create_new_rule(body): db = get_mongo() db = get_mongo() Loading @@ -40,20 +80,20 @@ def create_new_rule(body): # We check apiProviderId if it exists, but we focus on the mandatory identity # We check apiProviderId if it exists, but we focus on the mandatory identity api_id = ps.get('apiProviderId', [None])[0] api_id = ps.get('apiProviderId', [None])[0] created_by = ps.get('createdByUser') user_name = ps.get('userName') # Use the available ID to validate ownership via certificate signature # Use the available ID to validate ownership via certificate signature # We prioritize apiProviderId, then createdByUser as fallback for validation # We prioritize apiProviderId, then userName as fallback for validation user_to_validate = api_id if api_id else created_by user_to_validate = api_id if api_id else user_name if user_to_validate: if user_to_validate: result = valid_user.validate_user_cert(user_to_validate, cert_sig) result = valid_user.validate_user_cert(user_to_validate, cert_sig) if result is not None: if result is not None: return result return result else: else: # If even createdByUser is missing (despite being mandatory in your logic), # If even userName is missing (despite being mandatory in your logic), # we block it or handle it as a Bad Request # we block it or handle it as a Bad Request return {"title": "Bad Request", "detail": "createdByUser is mandatory"}, 400 return {"title": "Bad Request", "detail": "userName is mandatory"}, 400 # 1. Generate a unique ruleId # 1. Generate a unique ruleId body['ruleId'] = str(uuid.uuid4()) body['ruleId'] = str(uuid.uuid4()) Loading Loading @@ -96,10 +136,7 @@ def create_new_rule(body): "detail": "Invalid endsAt format." "detail": "Invalid endsAt format." }, 400 }, 400 # 6. Set 'updatedBy' metadata from providerSelector body['createdBy'] = cn # ps = body.get('providerSelector', {}) # body['updatedBy'] = ps.get('createdByUser', 'system') # This prevents a user from putting "User_A" in the JSON while using "User_B" certificate body['updatedBy'] = cn body['updatedBy'] = cn # Save to MongoDB # Save to MongoDB Loading @@ -111,69 +148,214 @@ def create_new_rule(body): return body, 201 return body, 201 def get_rule_by_id(rule_id): def get_rule_by_id(rule_id): """ Retrieve a specific visibility rule by its ID. - Superadmin: Can view any rule. - Providers: Can view rules they own (userName) or created (updatedBy). """ db = get_mongo() db = get_mongo() col = db.get_col_by_name("visibility_rules") col = db.get_col_by_name("visibility_rules") cn = request.user_cn # 1. Fetch the rule from the database # We exclude the MongoDB internal _id field immediately rule = col.find_one({"ruleId": rule_id}, {"_id": 0}) rule = col.find_one({"ruleId": rule_id}, {"_id": 0}) # if rule: # return rule, 200 # return {"title": "Not Found", "detail": "Rule not found"}, 404 if not rule: if not rule: return {"title": "Not Found", "detail": "Rule not found"}, 404 return {"title": "Not Found", "detail": "Rule not found"}, 404 # SECURITY CHECK: Only Superadmin or the actual owner (CN) can view # 2. Authorization Check: Superadmin bypass if request.user_cn != "superadmin" and rule.get('updatedBy') != request.user_cn: if cn == "superadmin": return {"title": "Unauthorized", "detail": "You do not have permission to view this rule"}, 401 return rule, 200 # 3. Identity Translation for Providers # Link the certificate CN to the registered Friendly Username prov_col = db.get_col_by_name("provider_details") provider = prov_col.find_one({"apiProvFuncs.apiProvFuncId": cn}) friendly_name = provider.get('userName') if provider else None # 4. Permission Validation # is_owner: Checks the logical owner in the rule (userName) # is_creator: Checks the cryptographic signer (updatedBy) is_owner = rule.get('providerSelector', {}).get('userName') == friendly_name is_creator = rule.get('updatedBy') == cn rule.pop('_id', None) if is_owner or is_creator: return rule, 200 return rule, 200 # 5. Deny access if the requester is neither the owner nor the creator return { "title": "Unauthorized", "detail": "You do not have permission to view this rule" }, 401 # def get_rule_by_id(rule_id): # db = get_mongo() # col = db.get_col_by_name("visibility_rules") # cn = request.user_cn # rule = col.find_one({"ruleId": rule_id}, {"_id": 0}) # # if rule: # # return rule, 200 # # return {"title": "Not Found", "detail": "Rule not found"}, 404 # if not rule: # return {"title": "Not Found", "detail": "Rule not found"}, 404 # # SECURITY CHECK: Only Superadmin or the actual owner (CN) can view # if request.user_cn != "superadmin" and rule.get('updatedBy') != request.user_cn: # return {"title": "Unauthorized", "detail": "You do not have permission to view this rule"}, 401 # rule.pop('_id', None) # return rule, 200 # def delete_rule_by_id(rule_id): # db = get_mongo() # col = db.get_col_by_name("visibility_rules") # cn = request.user_cn # # 1. Fetch the rule first to check ownership # rule = col.find_one({"ruleId": rule_id}) # if not rule: # return {"title": "Not Found", "detail": "Rule not found"}, 404 # # 2. SECURITY CHECK: Only Superadmin or the actual owner (CN) can delete # # We compare the current certificate CN against the stored 'updatedBy' # if request.user_cn != "superadmin" and rule.get('userName') != request.user_cn: # return {"title": "Unauthorized", "detail": "You do not have permission to delete this rule"}, 401 # # 3. Perform the actual deletion # res = col.delete_one({"ruleId": rule_id}) # # 4. Final check on the operation result # if res.deleted_count > 0: # return None, 204 # return {"title": "Not Found", "detail": "Rule not found"}, 404 def delete_rule_by_id(rule_id): def delete_rule_by_id(rule_id): """ Delete a specific visibility rule after verifying ownership. - Superadmin: Can delete any rule. - Providers: Can only delete rules assigned to them or created by them. """ db = get_mongo() db = get_mongo() col = db.get_col_by_name("visibility_rules") col = db.get_col_by_name("visibility_rules") # 1. Fetch the rule first to check ownership cn = request.user_cn # 1. Retrieve the rule to check metadata rule = col.find_one({"ruleId": rule_id}) rule = col.find_one({"ruleId": rule_id}) if not rule: if not rule: return {"title": "Not Found", "detail": "Rule not found"}, 404 return {"title": "Not Found", "detail": "Rule not found"}, 404 # 2. SECURITY CHECK: Only Superadmin or the actual owner (CN) can delete # 2. Authorization Check: Superadmin bypass # We compare the current certificate CN against the stored 'updatedBy' if cn == "superadmin": if request.user_cn != "superadmin" and rule.get('updatedBy') != request.user_cn: col.delete_one({"ruleId": rule_id}) return {"title": "Unauthorized", "detail": "You do not have permission to delete this rule"}, 401 return None, 204 # 3. Identity Translation for Providers # Resolve the certificate ID to the registered friendly username prov_col = db.get_col_by_name("provider_details") provider = prov_col.find_one({"apiProvFuncs.apiProvFuncId": cn}) friendly_name = provider.get('userName') if provider else None # 3. Perform the actual deletion # 4. Permissions Validation res = col.delete_one({"ruleId": rule_id}) # is_owner: Checks if the rule's userName matches the provider's registered name. # is_creator: Checks if the rule was signed by the current certificate ID. is_owner = rule.get('providerSelector', {}).get('userName') == friendly_name is_creator = rule.get('updatedBy') == cn # 4. Final check on the operation result if is_owner or is_creator: res = col.delete_one({"ruleId": rule_id}) if res.deleted_count > 0: if res.deleted_count > 0: return None, 204 return None, 204 return {"title": "Not Found", "detail": "Rule not found"}, 404 # 5. Deny access if no ownership is proven return { "title": "Unauthorized", "detail": "You do not have permission to delete this rule" }, 401 # def update_rule_patch(rule_id, body): # def update_rule_patch(rule_id, body): # db = get_mongo() # db = get_mongo() # col = db.get_col_by_name("visibility_rules") # col = db.get_col_by_name("visibility_rules") # cn = request.user_cn # # Fetch existing rule for comparison # existing_rule = col.find_one({"ruleId": rule_id}) # if not existing_rule: # return {"title": "Not Found", "detail": "Rule not found"}, 404 # # SECURITY CHECK: Only Superadmin or the actual owner (CN) can update # if request.user_cn != "superadmin" and existing_rule.get('userName') != request.user_cn: # return {"title": "Unauthorized", "detail": "You do not have permission to modify this rule"}, 401 # # Always update 'updatedAt' timestamp # now = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') # body['updatedAt'] = now # # Re-validate date logic if dates are being modified # new_start = body.get('startsAt', existing_rule.get('startsAt')) # new_end = body.get('endsAt', existing_rule.get('endsAt')) # if new_start and new_end: # try: # s = datetime.fromisoformat(new_start.replace('Z', '+00:00')) # e = datetime.fromisoformat(new_end.replace('Z', '+00:00')) # if e <= s: # return { # "title": "Bad Request", # "detail": "Validation Error: endsAt must be later than startsAt" # }, 400 # except ValueError: # return {"title": "Bad Request", "detail": "Invalid date format."}, 400 # # # Update metadata if user info is provided # # if 'providerSelector' in body and 'createdByUser' in body['providerSelector']: # # body['updatedBy'] = body['providerSelector']['createdByUser'] # body['updatedBy'] = cn # col.update_one({"ruleId": rule_id}, {"$set": body}) # col.update_one({"ruleId": rule_id}, {"$set": body}) # updated_rule = col.find_one({"ruleId": rule_id}, {"_id": 0}) # updated_rule = col.find_one({"ruleId": rule_id}, {"_id": 0}) # return updated_rule, 200 # return updated_rule, 200 def update_rule_patch(rule_id, body): def update_rule_patch(rule_id, body): """ Update a specific visibility rule using PATCH logic. - Superadmin: Can modify any rule. - Providers: Can only modify rules they own or created. """ db = get_mongo() db = get_mongo() col = db.get_col_by_name("visibility_rules") col = db.get_col_by_name("visibility_rules") cn = request.user_cn # Fetch existing rule for comparison # 1. Fetch existing rule to verify existence and check ownership existing_rule = col.find_one({"ruleId": rule_id}) existing_rule = col.find_one({"ruleId": rule_id}) if not existing_rule: if not existing_rule: return {"title": "Not Found", "detail": "Rule not found"}, 404 return {"title": "Not Found", "detail": "Rule not found"}, 404 # SECURITY CHECK: Only Superadmin or the actual owner (CN) can update # 2. Authorization Check: Superadmin bypass if request.user_cn != "superadmin" and existing_rule.get('updatedBy') != request.user_cn: if cn != "superadmin": return {"title": "Unauthorized", "detail": "You do not have permission to modify this rule"}, 401 # Resolve Certificate CN to the registered Friendly Username prov_col = db.get_col_by_name("provider_details") provider = prov_col.find_one({"apiProvFuncs.apiProvFuncId": cn}) friendly_name = provider.get('userName') if provider else None # Ownership Validation: # - Check if the rule's userName matches the provider's registered name # - OR check if the rule was last updated/created by this specific certificate is_owner = existing_rule.get('providerSelector', {}).get('userName') == friendly_name is_creator = existing_rule.get('updatedBy') == cn if not (is_owner or is_creator): return { "title": "Unauthorized", "detail": "You do not have permission to modify this rule" }, 401 # Always update 'updatedAt' timestamp # 3. Metadata Updates now = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') now = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') body['updatedAt'] = now body['updatedAt'] = now body['updatedBy'] = cn # Track who performed the update # Re-validate date logic if dates are being modified # 4. Date Logic Validation # Ensure startsAt is earlier than endsAt, even if only one is being updated new_start = body.get('startsAt', existing_rule.get('startsAt')) new_start = body.get('startsAt', existing_rule.get('startsAt')) new_end = body.get('endsAt', existing_rule.get('endsAt')) new_end = body.get('endsAt', existing_rule.get('endsAt')) Loading @@ -189,10 +371,10 @@ def update_rule_patch(rule_id, body): except ValueError: except ValueError: return {"title": "Bad Request", "detail": "Invalid date format."}, 400 return {"title": "Bad Request", "detail": "Invalid date format."}, 400 # Update metadata if user info is provided # 5. Apply changes to Database if 'providerSelector' in body and 'createdByUser' in body['providerSelector']: # We use $set to only modify the fields provided in the PATCH body body['updatedBy'] = body['providerSelector']['createdByUser'] col.update_one({"ruleId": rule_id}, {"$set": body}) col.update_one({"ruleId": rule_id}, {"$set": body}) # Return the fully updated object (excluding Mongo's internal _id) updated_rule = col.find_one({"ruleId": rule_id}, {"_id": 0}) updated_rule = col.find_one({"ruleId": rule_id}, {"_id": 0}) return updated_rule, 200 return updated_rule, 200 No newline at end of file
services/helper/helper_service/services/visibility_control/openapi/openapi.yaml +10 −10 Original line number Original line Diff line number Diff line Loading @@ -6,7 +6,7 @@ info: (discovery) and whether invokers are allowed to create a security context to access them. (discovery) and whether invokers are allowed to create a security context to access them. - Rules are global and evaluated with "more specific wins" precedence. - Rules are global and evaluated with "more specific wins" precedence. - If no rule matches, the decision uses OpenCAPIF's global default (outside this API). - If no rule matches, the decision uses OpenCAPIF's global default (outside this API). - Provider selector is mandatory in rules and must contain at least one selector field (createdByUser is mandatory). - Provider selector is mandatory in rules and must contain at least one selector field (userName is mandatory). title: OpenCAPIF Visibility Control title: OpenCAPIF Visibility Control version: 1.0.0 version: 1.0.0 # servers: # servers: Loading Loading @@ -88,7 +88,7 @@ paths: allow_except_some_invokers: allow_except_some_invokers: value: value: providerSelector: providerSelector: createdByUser: userA userName: userA apiProviderId: apiProviderId: - capif-prov-01 - capif-prov-01 - capif-prov-02 - capif-prov-02 Loading Loading @@ -260,7 +260,7 @@ components: apiName: apiName: - apiName - apiName - apiName - apiName createdByUser: createdByUser userName: userName aefId: aefId: - aefId - aefId - aefId - aefId Loading Loading @@ -375,7 +375,7 @@ components: apiName: apiName: - apiName - apiName - apiName - apiName createdByUser: createdByUser userName: userName aefId: aefId: - aefId - aefId - aefId - aefId Loading Loading @@ -488,7 +488,7 @@ components: apiName: apiName: - apiName - apiName - apiName - apiName createdByUser: createdByUser userName: userName aefId: aefId: - aefId - aefId - aefId - aefId Loading @@ -496,9 +496,9 @@ components: - apiId - apiId - apiId - apiId properties: properties: createdByUser: userName: minLength: 1 minLength: 1 title: createdByUser title: userName type: string type: string apiProviderId: apiProviderId: items: items: Loading Loading @@ -529,7 +529,7 @@ components: type: array type: array uniqueItems: true uniqueItems: true required: required: - createdByUser - userName title: ProviderSelector title: ProviderSelector type: object type: object InvokerSelector: InvokerSelector: Loading Loading @@ -1876,7 +1876,7 @@ components: apiName: apiName: - apiName - apiName - apiName - apiName createdByUser: createdByUser userName: userName aefId: aefId: - aefId - aefId - aefId - aefId Loading Loading @@ -1905,7 +1905,7 @@ components: apiName: apiName: - apiName - apiName - apiName - apiName createdByUser: createdByUser userName: userName aefId: aefId: - aefId - aefId - aefId - aefId Loading