Commit 941d69d3 authored by Stavros-Anastasios Charismiadis's avatar Stavros-Anastasios Charismiadis
Browse files

Add cert_validation in Audit and Logging APIs. Fix some tests according to cert validation

parent fc6dd09a
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -4,6 +4,7 @@ mongo: {
  'db': 'capif',
  'logs_col': 'invocationlogs',
  'capif_users_col': "user",
  'certs_col': "certs",
  'host': 'mongo',
  'port': "27017"
}
+32 −0
Original line number Diff line number Diff line
@@ -3,13 +3,45 @@ from logs import util
from logs.models.interface_description import InterfaceDescription  # noqa: E501
from logs.models.operation import Operation  # noqa: E501
from logs.models.protocol import Protocol  # noqa: E501
from functools import wraps
from cryptography import x509
from cryptography.hazmat.backends import default_backend

from ..core.auditoperations import AuditOperations
from ..core.responses import bad_request_error
from ..core.validate_user import ControlAccess

audit_operations = AuditOperations()
valid_user = ControlAccess()


def cert_validation():
    def _cert_validation(f):
        @wraps(f)
        def __cert_validation(*args, **kwargs):

            args = request.view_args
            cert_tmp = request.headers['X-Ssl-Client-Cert']
            cert_raw = cert_tmp.replace('\t', '')

            cert = x509.load_pem_x509_certificate(str.encode(cert_raw), default_backend())

            cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0].value.strip()

            if cn != "superadmin":
                cert_signature = cert.signature.hex()
                result = valid_user.validate_user_cert(cert_signature)

                if result is not None:
                    return result

            result = f(**kwargs)
            return result
        return __cert_validation
    return _cert_validation


@cert_validation()
def api_invocation_logs_get(aef_id=None, api_invoker_id=None, time_range_start=None, time_range_end=None, api_id=None, api_name=None, api_version=None, protocol=None, operation=None, result=None, resource_name=None, src_interface=None, dest_interface=None, supported_features=None):  # noqa: E501
    """api_invocation_logs_get

+1 −0
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@ class MongoDatabse():
        self.db = self.__connect()
        self.invocation_logs = self.config['mongo']['logs_col']
        self.capif_users = self.config['mongo']['capif_users_col']
        self.certs_col = self.config['mongo']['certs_col']

    def get_col_by_name(self, name):
        return self.db[name]
+3 −0
Original line number Diff line number Diff line
@@ -2,6 +2,7 @@ from api_invocation_logs.models.invocation_log import InvocationLog # noqa: E50
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from flask import current_app, request
from functools import wraps

from ..core.invocationlogs import LoggingInvocationOperations
from ..core.validate_user import ControlAccess
@@ -10,6 +11,7 @@ logging_invocation_operations = LoggingInvocationOperations()

valid_user = ControlAccess()


def cert_validation():
    def _cert_validation(f):
        @wraps(f)
@@ -36,6 +38,7 @@ def cert_validation():
    return _cert_validation


@cert_validation()
def aef_id_logs_post(aef_id, body):  # noqa: E501
    """aef_id_logs_post

+1 −0
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@ class MongoDatabse():
        self.provider_details = self.config['mongo']['prov_col']
        self.service_apis = self.config['mongo']['serv_col']
        self.capif_users = self.config['mongo']['capif_users_col']
        self.certs_col = self.config['mongo']['certs_col']

    def get_col_by_name(self, name):
        return self.db[name]
Loading