Loading services/TS29222_CAPIF_Auditing_API/config.yaml +1 −0 Original line number Diff line number Diff line Loading @@ -4,6 +4,7 @@ mongo: { 'db': 'capif', 'logs_col': 'invocationlogs', 'capif_users_col': "user", 'certs_col': "certs", 'host': 'mongo', 'port': "27017" } Loading services/TS29222_CAPIF_Auditing_API/logs/controllers/default_controller.py +32 −0 Original line number Diff line number Diff line Loading @@ -3,13 +3,45 @@ from logs import util from logs.models.interface_description import InterfaceDescription # noqa: E501 from logs.models.operation import Operation # noqa: E501 from logs.models.protocol import Protocol # noqa: E501 from functools import wraps from cryptography import x509 from cryptography.hazmat.backends import default_backend from ..core.auditoperations import AuditOperations from ..core.responses import bad_request_error from ..core.validate_user import ControlAccess audit_operations = AuditOperations() valid_user = ControlAccess() def cert_validation(): def _cert_validation(f): @wraps(f) def __cert_validation(*args, **kwargs): args = request.view_args cert_tmp = request.headers['X-Ssl-Client-Cert'] cert_raw = cert_tmp.replace('\t', '') cert = x509.load_pem_x509_certificate(str.encode(cert_raw), default_backend()) cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0].value.strip() if cn != "superadmin": cert_signature = cert.signature.hex() result = valid_user.validate_user_cert(cert_signature) if result is not None: return result result = f(**kwargs) return result return __cert_validation return _cert_validation @cert_validation() def api_invocation_logs_get(aef_id=None, api_invoker_id=None, time_range_start=None, time_range_end=None, api_id=None, api_name=None, api_version=None, protocol=None, operation=None, result=None, resource_name=None, src_interface=None, dest_interface=None, supported_features=None): # noqa: E501 """api_invocation_logs_get Loading services/TS29222_CAPIF_Auditing_API/logs/core/validate_user.py 0 → 100644 +31 −0 Original line number Diff line number Diff line import json from flask import Response, current_app from ..encoder import CustomJSONEncoder from ..models.problem_details import ProblemDetails from ..util import serialize_clean_camel_case from .resources import Resource from .responses import internal_server_error class ControlAccess(Resource): def validate_user_cert(self, cert_signature): cert_col = self.db.get_col_by_name(self.db.certs_col) try: my_query = {'cert_signature': cert_signature} cert_entry = cert_col.find_one(my_query) if cert_entry is not None: if cert_entry["role"] != "AMF": prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate invoker" current_app.logger.error(exception + "::" + str(e)) return internal_server_error(detail=exception, cause=str(e)) No newline at end of file services/TS29222_CAPIF_Auditing_API/logs/db/db.py +1 −0 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ class MongoDatabse(): self.db = self.__connect() self.invocation_logs = self.config['mongo']['logs_col'] self.capif_users = self.config['mongo']['capif_users_col'] self.certs_col = self.config['mongo']['certs_col'] def get_col_by_name(self, name): return self.db[name] Loading services/TS29222_CAPIF_Discover_Service_API/config.yaml +1 −0 Original line number Diff line number Diff line Loading @@ -5,6 +5,7 @@ mongo: { 'col': 'serviceapidescriptions', 'invokers_col': 'invokerdetails', 'capif_users_col': "user", 'certs_col': "certs", 'host': 'mongo', 'port': "27017" } Loading Loading
services/TS29222_CAPIF_Auditing_API/config.yaml +1 −0 Original line number Diff line number Diff line Loading @@ -4,6 +4,7 @@ mongo: { 'db': 'capif', 'logs_col': 'invocationlogs', 'capif_users_col': "user", 'certs_col': "certs", 'host': 'mongo', 'port': "27017" } Loading
services/TS29222_CAPIF_Auditing_API/logs/controllers/default_controller.py +32 −0 Original line number Diff line number Diff line Loading @@ -3,13 +3,45 @@ from logs import util from logs.models.interface_description import InterfaceDescription # noqa: E501 from logs.models.operation import Operation # noqa: E501 from logs.models.protocol import Protocol # noqa: E501 from functools import wraps from cryptography import x509 from cryptography.hazmat.backends import default_backend from ..core.auditoperations import AuditOperations from ..core.responses import bad_request_error from ..core.validate_user import ControlAccess audit_operations = AuditOperations() valid_user = ControlAccess() def cert_validation(): def _cert_validation(f): @wraps(f) def __cert_validation(*args, **kwargs): args = request.view_args cert_tmp = request.headers['X-Ssl-Client-Cert'] cert_raw = cert_tmp.replace('\t', '') cert = x509.load_pem_x509_certificate(str.encode(cert_raw), default_backend()) cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0].value.strip() if cn != "superadmin": cert_signature = cert.signature.hex() result = valid_user.validate_user_cert(cert_signature) if result is not None: return result result = f(**kwargs) return result return __cert_validation return _cert_validation @cert_validation() def api_invocation_logs_get(aef_id=None, api_invoker_id=None, time_range_start=None, time_range_end=None, api_id=None, api_name=None, api_version=None, protocol=None, operation=None, result=None, resource_name=None, src_interface=None, dest_interface=None, supported_features=None): # noqa: E501 """api_invocation_logs_get Loading
services/TS29222_CAPIF_Auditing_API/logs/core/validate_user.py 0 → 100644 +31 −0 Original line number Diff line number Diff line import json from flask import Response, current_app from ..encoder import CustomJSONEncoder from ..models.problem_details import ProblemDetails from ..util import serialize_clean_camel_case from .resources import Resource from .responses import internal_server_error class ControlAccess(Resource): def validate_user_cert(self, cert_signature): cert_col = self.db.get_col_by_name(self.db.certs_col) try: my_query = {'cert_signature': cert_signature} cert_entry = cert_col.find_one(my_query) if cert_entry is not None: if cert_entry["role"] != "AMF": prob = ProblemDetails(title="Unauthorized", detail="User not authorized", cause="You are not the owner of this resource") prob = serialize_clean_camel_case(prob) return Response(json.dumps(prob, cls=CustomJSONEncoder), status=401, mimetype="application/json") except Exception as e: exception = "An exception occurred in validate invoker" current_app.logger.error(exception + "::" + str(e)) return internal_server_error(detail=exception, cause=str(e)) No newline at end of file
services/TS29222_CAPIF_Auditing_API/logs/db/db.py +1 −0 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ class MongoDatabse(): self.db = self.__connect() self.invocation_logs = self.config['mongo']['logs_col'] self.capif_users = self.config['mongo']['capif_users_col'] self.certs_col = self.config['mongo']['certs_col'] def get_col_by_name(self, name): return self.db[name] Loading
services/TS29222_CAPIF_Discover_Service_API/config.yaml +1 −0 Original line number Diff line number Diff line Loading @@ -5,6 +5,7 @@ mongo: { 'col': 'serviceapidescriptions', 'invokers_col': 'invokerdetails', 'capif_users_col': "user", 'certs_col': "certs", 'host': 'mongo', 'port': "27017" } Loading