Commit 8dfd230e authored by Claudia Carballo Gonzalez's avatar Claudia Carballo Gonzalez
Browse files

add client certificate validation and API authorization logic

parent 060e2bf8
Loading
Loading
Loading
Loading
Loading
+63 −0
Original line number Original line Diff line number Diff line
from functools import wraps
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from flask import request
import connexion
#from ..core.validate_user import ControlAccess

from visibility_control.core.validate_user import ControlAccess
valid_user = ControlAccess()

def cert_validation():
    def _cert_validation(f):
        @wraps(f)
        def __cert_validation(*args, **kwargs):
            # 1. Get certificate header safely
            cert_tmp = request.headers.get('X-Ssl-Client-Cert')
            
            if not cert_tmp:
                return {"title": "Unauthorized", "detail": "Certificate header missing"}, 401

            try:
                # 2. Process certificate
                cert_raw = cert_tmp.replace('\\t', '')
                cert = x509.load_pem_x509_certificate(str.encode(cert_raw), default_backend())
                cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0].value.strip()

                # 3. Store identity for the Core logic
                request.user_cn = cn
                request.cert_signature = cert.signature.hex()

                return f(**kwargs)
            except Exception:
                return {"title": "Unauthorized", "detail": "Invalid certificate format"}, 401
        return __cert_validation
    return _cert_validation

# def cert_validation():
#     def _cert_validation(f):
#         @wraps(f)
#         def __cert_validation(*args, **kwargs):
#             # 1. Validación de existencia de certificado (Literal del ejemplo)
#             cert_tmp = request.headers['X-Ssl-Client-Cert']
            
#             cert_raw = cert_tmp.replace('\\t', '')
#             cert = x509.load_pem_x509_certificate(str.encode(cert_raw), default_backend())
#             cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0].value.strip()

#             # Guardamos para que el Core los vea
#             request.user_cn = cn
#             request.cert_signature = cert.signature.hex()

#             if cn != "superadmin":
#                 cert_signature = cert.signature.hex()
#                 # Aquí, si hay un ID en la URL (como rule_id), podríamos validar.
#                 # Pero para el POST, como el ID está en el cuerpo, 
#                 # simplemente dejamos que el Core lo gestione para no romper el wrap.
                
#                 # Si quieres que el wrap sea universal, lo dejamos pasar tras validar el CN
#                 pass 

#             return f(**kwargs)
#         return __cert_validation
#     return _cert_validation
 No newline at end of file
+10 −1
Original line number Original line Diff line number Diff line
import connexion
import connexion
from typing import Dict, Tuple, Union
from typing import Dict, Tuple, Union


# Importamos la lógica del CORE (Asegúrate de crear este archivo después)
#from ..auth import cert_validation

from visibility_control.auth import cert_validation

# Importamos la lógica del CORE 
from ..core.visibility_control_core import (
from ..core.visibility_control_core import (
    get_all_rules, 
    get_all_rules, 
    create_new_rule, 
    create_new_rule, 
@@ -17,10 +21,12 @@ from visibility_control.models.rule_patch_request import RulePatchRequest
from visibility_control.models.rules_get200_response import RulesGet200Response
from visibility_control.models.rules_get200_response import RulesGet200Response
from visibility_control import util
from visibility_control import util


@cert_validation()
def rules_get():
def rules_get():
    """List rules"""
    """List rules"""
    return get_all_rules()
    return get_all_rules()


@cert_validation()
def rules_post(body): 
def rules_post(body): 
    """
    """
    Create a rule
    Create a rule
@@ -33,14 +39,17 @@ def rules_post(body):
        
        
    return Error(title="Bad Request", detail="JSON body required", status=400), 400
    return Error(title="Bad Request", detail="JSON body required", status=400), 400


@cert_validation()
def rules_rule_id_delete(rule_id):
def rules_rule_id_delete(rule_id):
    """Delete a rule"""
    """Delete a rule"""
    return delete_rule_by_id(rule_id)
    return delete_rule_by_id(rule_id)


@cert_validation()
def rules_rule_id_get(rule_id):
def rules_rule_id_get(rule_id):
    """Get a rule"""
    """Get a rule"""
    return get_rule_by_id(rule_id)
    return get_rule_by_id(rule_id)


@cert_validation()
def rules_rule_id_patch(rule_id, rule_patch_request):
def rules_rule_id_patch(rule_id, rule_patch_request):
    """Update a rule (partial)"""
    """Update a rule (partial)"""
    if connexion.request.is_json:
    if connexion.request.is_json:
+31 −0
Original line number Original line Diff line number Diff line
import json
from flask import Response, current_app

class Resource:
    def __init__(self):
        # We use the existing mongo helper from your service
        from db.db import get_mongo
        self.db = get_mongo()

class ControlAccess(Resource):
    def validate_user_cert(self, api_provider_id, cert_signature):
        # Access the certificates collection in CAPIF database
        cert_col = self.db.get_col_by_name("certs")
        try:
            # Check if provider_id matches the certificate signature
            my_query = {'provider_id': api_provider_id}
            cert_entry = cert_col.find_one(my_query)

            if cert_entry is not None:
                if cert_entry["cert_signature"] != cert_signature:
                    # Return 401 if signatures don't match
                    prob = {
                        "title": "Unauthorized", 
                        "detail": "User not authorized", 
                        "cause": "You are not the owner of this resource"
                    }
                    return Response(json.dumps(prob), status=401, mimetype="application/json")
            return None
        except Exception as e:
            current_app.logger.error("Error in validate_user_cert: " + str(e))
            return Response(json.dumps({"title": "Internal Server Error", "detail": str(e)}), status=500, mimetype="application/json")
 No newline at end of file
+64 −5
Original line number Original line Diff line number Diff line
@@ -3,6 +3,11 @@ from datetime import datetime, timezone
from db.db import get_mongo
from db.db import get_mongo
from config import Config
from config import Config


from flask import request
from visibility_control.core.validate_user import ControlAccess

valid_user = ControlAccess()

def get_all_rules():
def get_all_rules():
    db = get_mongo()
    db = get_mongo()
    # Usamos la colección configurada en el helper
    # Usamos la colección configurada en el helper
@@ -25,6 +30,31 @@ def create_new_rule(body):
    db = get_mongo()
    db = get_mongo()
    col = db.get_col_by_name("visibility_rules")
    col = db.get_col_by_name("visibility_rules")


    # Get identity extracted by the decorator
    cn = request.user_cn
    cert_sig = request.cert_signature

    # Security check: If not superadmin, validate the mandatory identity
    if cn != "superadmin":
        ps = body.get('providerSelector', {})
        
        # We check apiProviderId if it exists, but we focus on the mandatory identity
        api_id = ps.get('apiProviderId', [None])[0]
        created_by = ps.get('createdByUser')

        # Use the available ID to validate ownership via certificate signature
        # We prioritize apiProviderId, then createdByUser as fallback for validation
        user_to_validate = api_id if api_id else created_by

        if user_to_validate:
            result = valid_user.validate_user_cert(user_to_validate, cert_sig)
            if result is not None:
                return result
        else:
            # If even createdByUser is missing (despite being mandatory in your logic), 
            # we block it or handle it as a Bad Request
            return {"title": "Bad Request", "detail": "createdByUser is mandatory"}, 400
    
    # 1. Generate a unique ruleId
    # 1. Generate a unique ruleId
    body['ruleId'] = str(uuid.uuid4())
    body['ruleId'] = str(uuid.uuid4())
    
    
@@ -67,8 +97,10 @@ def create_new_rule(body):
            }, 400
            }, 400
    
    
    # 6. Set 'updatedBy' metadata from providerSelector
    # 6. Set 'updatedBy' metadata from providerSelector
    ps = body.get('providerSelector', {})
    # ps = body.get('providerSelector', {})
    body['updatedBy'] = ps.get('createdByUser', 'system')
    # body['updatedBy'] = ps.get('createdByUser', 'system')
    # This prevents a user from putting "User_A" in the JSON while using "User_B" certificate
    body['updatedBy'] = cn 


    # Save to MongoDB
    # Save to MongoDB
    col.insert_one(body)
    col.insert_one(body)
@@ -82,16 +114,39 @@ def get_rule_by_id(rule_id):
    db = get_mongo()
    db = get_mongo()
    col = db.get_col_by_name("visibility_rules")
    col = db.get_col_by_name("visibility_rules")
    rule = col.find_one({"ruleId": rule_id}, {"_id": 0})
    rule = col.find_one({"ruleId": rule_id}, {"_id": 0})
    if rule:
    # if rule:
        return rule, 200
    #     return rule, 200
    # return {"title": "Not Found", "detail": "Rule not found"}, 404
    if not rule:
        return {"title": "Not Found", "detail": "Rule not found"}, 404
        return {"title": "Not Found", "detail": "Rule not found"}, 404


    # SECURITY CHECK: Only Superadmin or the actual owner (CN) can view
    if request.user_cn != "superadmin" and rule.get('updatedBy') != request.user_cn:
        return {"title": "Unauthorized", "detail": "You do not have permission to view this rule"}, 401
    
    rule.pop('_id', None)
    return rule, 200

def delete_rule_by_id(rule_id):
def delete_rule_by_id(rule_id):
    db = get_mongo()
    db = get_mongo()
    col = db.get_col_by_name("visibility_rules")
    col = db.get_col_by_name("visibility_rules")
    # 1. Fetch the rule first to check ownership
    rule = col.find_one({"ruleId": rule_id})
    if not rule:
        return {"title": "Not Found", "detail": "Rule not found"}, 404

    # 2. SECURITY CHECK: Only Superadmin or the actual owner (CN) can delete
    # We compare the current certificate CN against the stored 'updatedBy'
    if request.user_cn != "superadmin" and rule.get('updatedBy') != request.user_cn:
        return {"title": "Unauthorized", "detail": "You do not have permission to delete this rule"}, 401

    # 3. Perform the actual deletion
    res = col.delete_one({"ruleId": rule_id})
    res = col.delete_one({"ruleId": rule_id})
    
    # 4. Final check on the operation result
    if res.deleted_count > 0:
    if res.deleted_count > 0:
        return None, 204
        return None, 204
        
    return {"title": "Not Found", "detail": "Rule not found"}, 404
    return {"title": "Not Found", "detail": "Rule not found"}, 404


# def update_rule_patch(rule_id, body):
# def update_rule_patch(rule_id, body):
@@ -110,6 +165,10 @@ def update_rule_patch(rule_id, body):
    if not existing_rule:
    if not existing_rule:
        return {"title": "Not Found", "detail": "Rule not found"}, 404
        return {"title": "Not Found", "detail": "Rule not found"}, 404
    
    
    # SECURITY CHECK: Only Superadmin or the actual owner (CN) can update
    if request.user_cn != "superadmin" and existing_rule.get('updatedBy') != request.user_cn:
        return {"title": "Unauthorized", "detail": "You do not have permission to modify this rule"}, 401

    # Always update 'updatedAt' timestamp
    # Always update 'updatedAt' timestamp
    now = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
    now = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
    body['updatedAt'] = now
    body['updatedAt'] = now