Loading services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/controllers/individual_api_invoker_enrolment_details_controller.py +44 −2 Original line number Diff line number Diff line from flask import current_app, request from functools import wraps from cryptography import x509 from cryptography.hazmat.backends import default_backend from ..core.apiinvokerenrolmentdetails import InvokerManagementOperations from ..core.validate_user import ControlAccess from api_invoker_management.models.api_invoker_enrolment_details_patch import \ APIInvokerEnrolmentDetailsPatch # noqa: E501 invoker_operations = InvokerManagementOperations() valid_user = ControlAccess() def cert_validation(): def _cert_validation(f): @wraps(f) def __cert_validation(*args, **kwargs): args = request.view_args cert_tmp = request.headers['X-Ssl-Client-Cert'] cert_raw = cert_tmp.replace('\t', '') cert = x509.load_pem_x509_certificate(str.encode(cert_raw), default_backend()) cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0].value.strip() if cn != "superadmin": cert_signature = cert.signature.hex() result = valid_user.validate_user_cert(args["onboardingId"], cert_signature) if result is not None: return result result = f(**kwargs) return result return __cert_validation return _cert_validation @cert_validation() def modify_ind_api_invoke_enrolment(onboarding_id, body): # noqa: E501 """modify_ind_api_invoke_enrolment Loading @@ -14,6 +51,11 @@ def modify_ind_api_invoke_enrolment(onboarding_id, body): # noqa: E501 :rtype: Union[APIInvokerEnrolmentDetails, Tuple[APIInvokerEnrolmentDetails, int], Tuple[APIInvokerEnrolmentDetails, int, Dict[str, str]] """ current_app.logger.info("Updating invoker") if request.is_json: api_invoker_enrolment_details_patch = APIInvokerEnrolmentDetailsPatch.from_dict(request.get_json()) # noqa: E501 return 'do some magic!' body = APIInvokerEnrolmentDetailsPatch.from_dict(request.get_json()) # noqa: E501 res = invoker_operations.patch_apiinvokerenrolmentdetail(onboarding_id, body) return res services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/apiinvokerenrolmentdetails.py +53 −2 Original line number Diff line number Diff line Loading @@ -11,7 +11,7 @@ from flask import Response, current_app from pymongo import ReturnDocument from ..config import Config from ..util import dict_to_camel_case, serialize_clean_camel_case from ..util import dict_to_camel_case, serialize_clean_camel_case, clean_empty from .auth_manager import AuthManager from .publisher import Publisher from .redis_event import RedisEvent Loading Loading @@ -136,6 +136,57 @@ class InvokerManagementOperations(Resource): self.auth_manager.update_auth_invoker( cert['data']["certificate"], onboard_id) apiinvokerenrolmentdetail.api_invoker_id = onboard_id apiinvokerenrolmentdetail_update = apiinvokerenrolmentdetail.to_dict() apiinvokerenrolmentdetail_update = clean_empty(apiinvokerenrolmentdetail_update) result = mycol.find_one_and_replace(result, apiinvokerenrolmentdetail_update, projection={'_id': 0}, return_document=ReturnDocument.AFTER, upsert=False) current_app.logger.debug("Invoker Resource inserted in database") invoker_updated = APIInvokerEnrolmentDetails().from_dict(dict_to_camel_case(result)) current_app.logger.debug(f"Invoker Updated: {invoker_updated}") res = make_response(object=serialize_clean_camel_case( invoker_updated), status=200) if res.status_code == 200: current_app.logger.info("Invoker Updated") RedisEvent("API_INVOKER_UPDATED", api_invoker_ids=[onboard_id]).send_event() return res except Exception as e: exception = "An exception occurred in update invoker" current_app.logger.error(exception + "::" + str(e)) return internal_server_error(detail=exception, cause=str(e)) def patch_apiinvokerenrolmentdetail(self, onboard_id, apiinvokerenrolmentdetail): mycol = self.db.get_col_by_name(self.db.invoker_enrolment_details) try: current_app.logger.debug("Patching invoker resource") result = self.__check_api_invoker_id(onboard_id) if isinstance(result, Response): return result if apiinvokerenrolmentdetail.onboarding_information: if apiinvokerenrolmentdetail.onboarding_information.api_invoker_public_key != result["onboarding_information"]["api_invoker_public_key"]: cert = self.__sign_cert( apiinvokerenrolmentdetail.onboarding_information.api_invoker_public_key, result["api_invoker_id"]) apiinvokerenrolmentdetail.onboarding_information.api_invoker_certificate = cert[ 'data']['certificate'] self.auth_manager.update_auth_invoker( cert['data']["certificate"], onboard_id) else: apiinvokerenrolmentdetail.onboarding_information.api_invoker_certificate = result["onboarding_information"]["api_invoker_certificate"] apiinvokerenrolmentdetail_update = apiinvokerenrolmentdetail.to_dict() apiinvokerenrolmentdetail_update = { key: value for key, value in apiinvokerenrolmentdetail_update.items() if value is not None Loading @@ -158,7 +209,7 @@ class InvokerManagementOperations(Resource): res = make_response(object=serialize_clean_camel_case( invoker_updated), status=200) if res.status_code == 200: current_app.logger.info("Invoker Updated") current_app.logger.info("Invoker Patched") RedisEvent("API_INVOKER_UPDATED", api_invoker_ids=[onboard_id]).send_event() return res Loading Loading
services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/controllers/individual_api_invoker_enrolment_details_controller.py +44 −2 Original line number Diff line number Diff line from flask import current_app, request from functools import wraps from cryptography import x509 from cryptography.hazmat.backends import default_backend from ..core.apiinvokerenrolmentdetails import InvokerManagementOperations from ..core.validate_user import ControlAccess from api_invoker_management.models.api_invoker_enrolment_details_patch import \ APIInvokerEnrolmentDetailsPatch # noqa: E501 invoker_operations = InvokerManagementOperations() valid_user = ControlAccess() def cert_validation(): def _cert_validation(f): @wraps(f) def __cert_validation(*args, **kwargs): args = request.view_args cert_tmp = request.headers['X-Ssl-Client-Cert'] cert_raw = cert_tmp.replace('\t', '') cert = x509.load_pem_x509_certificate(str.encode(cert_raw), default_backend()) cn = cert.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0].value.strip() if cn != "superadmin": cert_signature = cert.signature.hex() result = valid_user.validate_user_cert(args["onboardingId"], cert_signature) if result is not None: return result result = f(**kwargs) return result return __cert_validation return _cert_validation @cert_validation() def modify_ind_api_invoke_enrolment(onboarding_id, body): # noqa: E501 """modify_ind_api_invoke_enrolment Loading @@ -14,6 +51,11 @@ def modify_ind_api_invoke_enrolment(onboarding_id, body): # noqa: E501 :rtype: Union[APIInvokerEnrolmentDetails, Tuple[APIInvokerEnrolmentDetails, int], Tuple[APIInvokerEnrolmentDetails, int, Dict[str, str]] """ current_app.logger.info("Updating invoker") if request.is_json: api_invoker_enrolment_details_patch = APIInvokerEnrolmentDetailsPatch.from_dict(request.get_json()) # noqa: E501 return 'do some magic!' body = APIInvokerEnrolmentDetailsPatch.from_dict(request.get_json()) # noqa: E501 res = invoker_operations.patch_apiinvokerenrolmentdetail(onboarding_id, body) return res
services/TS29222_CAPIF_API_Invoker_Management_API/api_invoker_management/core/apiinvokerenrolmentdetails.py +53 −2 Original line number Diff line number Diff line Loading @@ -11,7 +11,7 @@ from flask import Response, current_app from pymongo import ReturnDocument from ..config import Config from ..util import dict_to_camel_case, serialize_clean_camel_case from ..util import dict_to_camel_case, serialize_clean_camel_case, clean_empty from .auth_manager import AuthManager from .publisher import Publisher from .redis_event import RedisEvent Loading Loading @@ -136,6 +136,57 @@ class InvokerManagementOperations(Resource): self.auth_manager.update_auth_invoker( cert['data']["certificate"], onboard_id) apiinvokerenrolmentdetail.api_invoker_id = onboard_id apiinvokerenrolmentdetail_update = apiinvokerenrolmentdetail.to_dict() apiinvokerenrolmentdetail_update = clean_empty(apiinvokerenrolmentdetail_update) result = mycol.find_one_and_replace(result, apiinvokerenrolmentdetail_update, projection={'_id': 0}, return_document=ReturnDocument.AFTER, upsert=False) current_app.logger.debug("Invoker Resource inserted in database") invoker_updated = APIInvokerEnrolmentDetails().from_dict(dict_to_camel_case(result)) current_app.logger.debug(f"Invoker Updated: {invoker_updated}") res = make_response(object=serialize_clean_camel_case( invoker_updated), status=200) if res.status_code == 200: current_app.logger.info("Invoker Updated") RedisEvent("API_INVOKER_UPDATED", api_invoker_ids=[onboard_id]).send_event() return res except Exception as e: exception = "An exception occurred in update invoker" current_app.logger.error(exception + "::" + str(e)) return internal_server_error(detail=exception, cause=str(e)) def patch_apiinvokerenrolmentdetail(self, onboard_id, apiinvokerenrolmentdetail): mycol = self.db.get_col_by_name(self.db.invoker_enrolment_details) try: current_app.logger.debug("Patching invoker resource") result = self.__check_api_invoker_id(onboard_id) if isinstance(result, Response): return result if apiinvokerenrolmentdetail.onboarding_information: if apiinvokerenrolmentdetail.onboarding_information.api_invoker_public_key != result["onboarding_information"]["api_invoker_public_key"]: cert = self.__sign_cert( apiinvokerenrolmentdetail.onboarding_information.api_invoker_public_key, result["api_invoker_id"]) apiinvokerenrolmentdetail.onboarding_information.api_invoker_certificate = cert[ 'data']['certificate'] self.auth_manager.update_auth_invoker( cert['data']["certificate"], onboard_id) else: apiinvokerenrolmentdetail.onboarding_information.api_invoker_certificate = result["onboarding_information"]["api_invoker_certificate"] apiinvokerenrolmentdetail_update = apiinvokerenrolmentdetail.to_dict() apiinvokerenrolmentdetail_update = { key: value for key, value in apiinvokerenrolmentdetail_update.items() if value is not None Loading @@ -158,7 +209,7 @@ class InvokerManagementOperations(Resource): res = make_response(object=serialize_clean_camel_case( invoker_updated), status=200) if res.status_code == 200: current_app.logger.info("Invoker Updated") current_app.logger.info("Invoker Patched") RedisEvent("API_INVOKER_UPDATED", api_invoker_ids=[onboard_id]).send_event() return res Loading