Loading services/TS29222_CAPIF_Publish_Service_API/published_apis/controllers/individual_apf_published_api_controller.py +56 −3 Original line number Diff line number Diff line Loading @@ -8,8 +8,55 @@ from published_apis.models.service_api_description import ServiceAPIDescription from published_apis.models.service_api_description_patch import ServiceAPIDescriptionPatch # noqa: E501 from published_apis import util from cryptography import x509 from cryptography.hazmat.backends import default_backend from ..core.validate_user import ControlAccess from functools import wraps def modify_ind_apf_pub_api(service_api_id, apf_id, service_api_description_patch): # noqa: E501 from flask import request, current_app from ..core.serviceapidescriptions import PublishServiceOperations service_operations = PublishServiceOperations() valid_user = ControlAccess() def cert_validation(): def _cert_validation(f): @wraps(f) def __cert_validation(*args, **kwargs): args = request.view_args cert_tmp = request.headers['X-Ssl-Client-Cert'] cert_raw = cert_tmp.replace('\t', '') cert = x509.load_pem_x509_certificate( str.encode(cert_raw), default_backend()) cn = cert.subject.get_attributes_for_oid( x509.OID_COMMON_NAME)[0].value.strip() if cn != "superadmin": cert_signature = cert.signature.hex() service_api_id = None if 'serviceApiId' in args: service_api_id = args["serviceApiId"] result = valid_user.validate_user_cert( args["apfId"], cert_signature, service_api_id) if result is not None: return result result = service_operations.check_apf(args["apfId"]) if result is not None: return result result = f(**kwargs) return result return __cert_validation return _cert_validation @cert_validation() def modify_ind_apf_pub_api(service_api_id, apf_id, body): # noqa: E501 """modify_ind_apf_pub_api Modify an existing published service API. # noqa: E501 Loading @@ -23,6 +70,12 @@ def modify_ind_apf_pub_api(service_api_id, apf_id, service_api_description_patch :rtype: Union[ServiceAPIDescription, Tuple[ServiceAPIDescription, int], Tuple[ServiceAPIDescription, int, Dict[str, str]] """ current_app.logger.info( "Patching service api id with id: " + service_api_id) if request.is_json: service_api_description_patch = ServiceAPIDescriptionPatch.from_dict(request.get_json()) # noqa: E501 return 'do some magic!' body = ServiceAPIDescriptionPatch.from_dict(request.get_json()) # noqa: E501 response = service_operations.patch_serviceapidescription( service_api_id, apf_id, body) return response services/TS29222_CAPIF_Publish_Service_API/published_apis/core/serviceapidescriptions.py +90 −6 Original line number Diff line number Diff line Loading @@ -253,17 +253,24 @@ class PublishServiceOperations(Resource): "api_supp_feats": 1, "pub_api_path": 1, "ccf_id": 1, "api_status": 1}) "apf_id":1, "onboarding_date": 1}) if serviceapidescription_old is None: current_app.logger.error(service_api_not_found_message) return not_found_error(detail="Service API not existing", cause="Service API id not found") service_api_description = service_api_description.to_dict() api_status = service_api_description.get("api_status", None) service_api_description = clean_empty(service_api_description) if api_status: service_api_description["api_status"]=api_status service_api_description["apf_id"] = serviceapidescription_old["apf_id"] service_api_description["onboarding_date"] = serviceapidescription_old["onboarding_date"] service_api_description["api_id"] = serviceapidescription_old["api_id"] result = mycol.find_one_and_update( result = mycol.find_one_and_replace( serviceapidescription_old, {"$set": service_api_description}, service_api_description, projection={"_id": 0, "api_name": 1, "api_id": 1, Loading @@ -287,8 +294,6 @@ class PublishServiceOperations(Resource): response = make_response( object=service_api_description_updated, status=200) service_api_description = ServiceAPIDescription.from_dict( json.dumps(service_api_description_updated, cls=CustomJSONEncoder)) if response.status_code == 200: RedisEvent("SERVICE_API_UPDATE", service_api_descriptions=[service_api_description_updated]).send_event() Loading @@ -306,11 +311,90 @@ class PublishServiceOperations(Resource): current_app.logger.error(exception + "::" + str(e)) return internal_server_error(detail=exception, cause=str(e)) def patch_serviceapidescription(self, service_api_id, apf_id, patch_service_api_description): mycol = self.db.get_col_by_name(self.db.service_api_descriptions) try: current_app.logger.debug( "Patching service api with id: " + service_api_id) my_query = {'apf_id': apf_id, 'api_id': service_api_id} serviceapidescription_old = mycol.find_one(my_query, {"_id": 0, "api_name": 1, "api_id": 1, "aef_profiles": 1, "description": 1, "supported_features": 1, "shareable_info": 1, "service_api_category": 1, "api_supp_feats": 1, "pub_api_path": 1, "ccf_id": 1, "api_status": 1}) if serviceapidescription_old is None: current_app.logger.error(service_api_not_found_message) return not_found_error(detail="Service API not existing", cause="Service API id not found") patch_service_api_description = patch_service_api_description.to_dict() api_status = patch_service_api_description.get("api_status", None) patch_service_api_description = clean_empty(patch_service_api_description) if api_status: patch_service_api_description["api_status"]=api_status result = mycol.find_one_and_update( my_query, {"$set": patch_service_api_description}, projection={"_id": 0, "api_name": 1, "api_id": 1, "aef_profiles": 1, "description": 1, "supported_features": 1, "shareable_info": 1, "service_api_category": 1, "api_supp_feats": 1, "pub_api_path": 1, "ccf_id": 1, "api_status": 1}, return_document=ReturnDocument.AFTER, upsert=False) result = clean_empty(result) current_app.logger.debug("Patched service api") service_api_description_updated = dict_to_camel_case(result) current_app.logger.debug(service_api_description_updated) response = make_response( object=service_api_description_updated, status=200) if response.status_code == 200: RedisEvent("SERVICE_API_UPDATE", service_api_descriptions=[service_api_description_updated]).send_event() my_service_api = clean_n_camel_case(serviceapidescription_old) self.send_events_on_update( service_api_id, my_service_api, service_api_description_updated) return response except Exception as e: exception = "An exception occurred in update service" current_app.logger.error(exception + "::" + str(e)) return internal_server_error(detail=exception, cause=str(e)) def send_events_on_update(self, service_api_id, service_api_description_old, service_api_description_new): current_app.logger.debug("send Events if needed") current_app.logger.debug("Send Events if needed") service_api_status_event_old = self.service_api_availability_event( service_api_description_old) service_api_status_event_new = self.service_api_availability_event( Loading Loading
services/TS29222_CAPIF_Publish_Service_API/published_apis/controllers/individual_apf_published_api_controller.py +56 −3 Original line number Diff line number Diff line Loading @@ -8,8 +8,55 @@ from published_apis.models.service_api_description import ServiceAPIDescription from published_apis.models.service_api_description_patch import ServiceAPIDescriptionPatch # noqa: E501 from published_apis import util from cryptography import x509 from cryptography.hazmat.backends import default_backend from ..core.validate_user import ControlAccess from functools import wraps def modify_ind_apf_pub_api(service_api_id, apf_id, service_api_description_patch): # noqa: E501 from flask import request, current_app from ..core.serviceapidescriptions import PublishServiceOperations service_operations = PublishServiceOperations() valid_user = ControlAccess() def cert_validation(): def _cert_validation(f): @wraps(f) def __cert_validation(*args, **kwargs): args = request.view_args cert_tmp = request.headers['X-Ssl-Client-Cert'] cert_raw = cert_tmp.replace('\t', '') cert = x509.load_pem_x509_certificate( str.encode(cert_raw), default_backend()) cn = cert.subject.get_attributes_for_oid( x509.OID_COMMON_NAME)[0].value.strip() if cn != "superadmin": cert_signature = cert.signature.hex() service_api_id = None if 'serviceApiId' in args: service_api_id = args["serviceApiId"] result = valid_user.validate_user_cert( args["apfId"], cert_signature, service_api_id) if result is not None: return result result = service_operations.check_apf(args["apfId"]) if result is not None: return result result = f(**kwargs) return result return __cert_validation return _cert_validation @cert_validation() def modify_ind_apf_pub_api(service_api_id, apf_id, body): # noqa: E501 """modify_ind_apf_pub_api Modify an existing published service API. # noqa: E501 Loading @@ -23,6 +70,12 @@ def modify_ind_apf_pub_api(service_api_id, apf_id, service_api_description_patch :rtype: Union[ServiceAPIDescription, Tuple[ServiceAPIDescription, int], Tuple[ServiceAPIDescription, int, Dict[str, str]] """ current_app.logger.info( "Patching service api id with id: " + service_api_id) if request.is_json: service_api_description_patch = ServiceAPIDescriptionPatch.from_dict(request.get_json()) # noqa: E501 return 'do some magic!' body = ServiceAPIDescriptionPatch.from_dict(request.get_json()) # noqa: E501 response = service_operations.patch_serviceapidescription( service_api_id, apf_id, body) return response
services/TS29222_CAPIF_Publish_Service_API/published_apis/core/serviceapidescriptions.py +90 −6 Original line number Diff line number Diff line Loading @@ -253,17 +253,24 @@ class PublishServiceOperations(Resource): "api_supp_feats": 1, "pub_api_path": 1, "ccf_id": 1, "api_status": 1}) "apf_id":1, "onboarding_date": 1}) if serviceapidescription_old is None: current_app.logger.error(service_api_not_found_message) return not_found_error(detail="Service API not existing", cause="Service API id not found") service_api_description = service_api_description.to_dict() api_status = service_api_description.get("api_status", None) service_api_description = clean_empty(service_api_description) if api_status: service_api_description["api_status"]=api_status service_api_description["apf_id"] = serviceapidescription_old["apf_id"] service_api_description["onboarding_date"] = serviceapidescription_old["onboarding_date"] service_api_description["api_id"] = serviceapidescription_old["api_id"] result = mycol.find_one_and_update( result = mycol.find_one_and_replace( serviceapidescription_old, {"$set": service_api_description}, service_api_description, projection={"_id": 0, "api_name": 1, "api_id": 1, Loading @@ -287,8 +294,6 @@ class PublishServiceOperations(Resource): response = make_response( object=service_api_description_updated, status=200) service_api_description = ServiceAPIDescription.from_dict( json.dumps(service_api_description_updated, cls=CustomJSONEncoder)) if response.status_code == 200: RedisEvent("SERVICE_API_UPDATE", service_api_descriptions=[service_api_description_updated]).send_event() Loading @@ -306,11 +311,90 @@ class PublishServiceOperations(Resource): current_app.logger.error(exception + "::" + str(e)) return internal_server_error(detail=exception, cause=str(e)) def patch_serviceapidescription(self, service_api_id, apf_id, patch_service_api_description): mycol = self.db.get_col_by_name(self.db.service_api_descriptions) try: current_app.logger.debug( "Patching service api with id: " + service_api_id) my_query = {'apf_id': apf_id, 'api_id': service_api_id} serviceapidescription_old = mycol.find_one(my_query, {"_id": 0, "api_name": 1, "api_id": 1, "aef_profiles": 1, "description": 1, "supported_features": 1, "shareable_info": 1, "service_api_category": 1, "api_supp_feats": 1, "pub_api_path": 1, "ccf_id": 1, "api_status": 1}) if serviceapidescription_old is None: current_app.logger.error(service_api_not_found_message) return not_found_error(detail="Service API not existing", cause="Service API id not found") patch_service_api_description = patch_service_api_description.to_dict() api_status = patch_service_api_description.get("api_status", None) patch_service_api_description = clean_empty(patch_service_api_description) if api_status: patch_service_api_description["api_status"]=api_status result = mycol.find_one_and_update( my_query, {"$set": patch_service_api_description}, projection={"_id": 0, "api_name": 1, "api_id": 1, "aef_profiles": 1, "description": 1, "supported_features": 1, "shareable_info": 1, "service_api_category": 1, "api_supp_feats": 1, "pub_api_path": 1, "ccf_id": 1, "api_status": 1}, return_document=ReturnDocument.AFTER, upsert=False) result = clean_empty(result) current_app.logger.debug("Patched service api") service_api_description_updated = dict_to_camel_case(result) current_app.logger.debug(service_api_description_updated) response = make_response( object=service_api_description_updated, status=200) if response.status_code == 200: RedisEvent("SERVICE_API_UPDATE", service_api_descriptions=[service_api_description_updated]).send_event() my_service_api = clean_n_camel_case(serviceapidescription_old) self.send_events_on_update( service_api_id, my_service_api, service_api_description_updated) return response except Exception as e: exception = "An exception occurred in update service" current_app.logger.error(exception + "::" + str(e)) return internal_server_error(detail=exception, cause=str(e)) def send_events_on_update(self, service_api_id, service_api_description_old, service_api_description_new): current_app.logger.debug("send Events if needed") current_app.logger.debug("Send Events if needed") service_api_status_event_old = self.service_api_availability_event( service_api_description_old) service_api_status_event_new = self.service_api_availability_event( Loading