Loading services/TS29222_CAPIF_Auditing_API/logs/core/auditoperations.py +18 −0 Original line number Diff line number Diff line Loading @@ -9,6 +9,17 @@ from .resources import Resource from .responses import bad_request_error, internal_server_error, make_response, not_found_error TOTAL_FEATURES = 2 SUPPORTED_FEATURES_HEX = "1" def return_negotiated_supp_feat_dict(supp_feat): final_supp_feat = bin(int(supp_feat, 16) & int(SUPPORTED_FEATURES_HEX, 16))[2:].zfill(TOTAL_FEATURES)[::-1] return { "EnQueryInvokeLog": True if final_supp_feat[0] == "1" else False, "SliceBasedAPIExposure": True if final_supp_feat[1] == "1" else False, "Final": hex(int(final_supp_feat[::-1], 2))[2:] } class AuditOperations (Resource): def get_logs(self, query_parameters): Loading Loading @@ -55,6 +66,13 @@ class AuditOperations (Resource): if not result['logs']: return not_found_error(detail="Parameters do not match any log entry", cause="No logs found") client_features = query_parameters.get('supported_features') if client_features: negotiated = return_negotiated_supp_feat_dict(client_features) result['supported_features'] = negotiated["Final"] else: result['supported_features'] = client_features invocation_log = InvocationLog(result['aef_id'], result['api_invoker_id'], result['logs'], result['supported_features']) res = make_response(object=serialize_clean_camel_case(invocation_log), status=200) Loading services/TS29222_CAPIF_Security_API/capif_security/core/servicesecurity.py +18 −0 Original line number Diff line number Diff line Loading @@ -28,6 +28,18 @@ security_context_not_found_detail = "Security context not found" api_invoker_no_context_cause = "API Invoker has no security context" TOTAL_FEATURES = 3 SUPPORTED_FEATURES_HEX = "4" def return_negotiated_supp_feat_dict(supp_feat): final_supp_feat = bin(int(supp_feat, 16) & int(SUPPORTED_FEATURES_HEX, 16))[2:].zfill(TOTAL_FEATURES)[::-1] return { "Notification_test_event": True if final_supp_feat[0] == "1" else False, "Notification_websocket": True if final_supp_feat[1] == "1" else False, "SecurityInfoPerAPI": True if final_supp_feat[2] == "1" else False, "Final": hex(int(final_supp_feat[::-1], 2))[2:] } class SecurityOperations(Resource): def __check_invoker(self, api_invoker_id): Loading Loading @@ -252,6 +264,9 @@ class SecurityOperations(Resource): "Already security context defined with same api invoker id") return forbidden_error(detail="Security method already defined", cause="Identical AEF Profile IDs") negotiated = return_negotiated_supp_feat_dict(service_security.supported_features) service_security.supported_features = negotiated["Final"] for service_instance in service_security.security_info: if service_instance.interface_details is not None: Loading Loading @@ -487,6 +502,9 @@ class SecurityOperations(Resource): mycol = self.db.get_col_by_name(self.db.security_info) try: negotiated_supported_features = return_negotiated_supp_feat_dict(service_security.supported_features) service_security.supported_features = negotiated_supported_features["Final"] current_app.logger.debug("Updating security context") result = self.__check_invoker(api_invoker_id) if result != None: Loading Loading
services/TS29222_CAPIF_Auditing_API/logs/core/auditoperations.py +18 −0 Original line number Diff line number Diff line Loading @@ -9,6 +9,17 @@ from .resources import Resource from .responses import bad_request_error, internal_server_error, make_response, not_found_error TOTAL_FEATURES = 2 SUPPORTED_FEATURES_HEX = "1" def return_negotiated_supp_feat_dict(supp_feat): final_supp_feat = bin(int(supp_feat, 16) & int(SUPPORTED_FEATURES_HEX, 16))[2:].zfill(TOTAL_FEATURES)[::-1] return { "EnQueryInvokeLog": True if final_supp_feat[0] == "1" else False, "SliceBasedAPIExposure": True if final_supp_feat[1] == "1" else False, "Final": hex(int(final_supp_feat[::-1], 2))[2:] } class AuditOperations (Resource): def get_logs(self, query_parameters): Loading Loading @@ -55,6 +66,13 @@ class AuditOperations (Resource): if not result['logs']: return not_found_error(detail="Parameters do not match any log entry", cause="No logs found") client_features = query_parameters.get('supported_features') if client_features: negotiated = return_negotiated_supp_feat_dict(client_features) result['supported_features'] = negotiated["Final"] else: result['supported_features'] = client_features invocation_log = InvocationLog(result['aef_id'], result['api_invoker_id'], result['logs'], result['supported_features']) res = make_response(object=serialize_clean_camel_case(invocation_log), status=200) Loading
services/TS29222_CAPIF_Security_API/capif_security/core/servicesecurity.py +18 −0 Original line number Diff line number Diff line Loading @@ -28,6 +28,18 @@ security_context_not_found_detail = "Security context not found" api_invoker_no_context_cause = "API Invoker has no security context" TOTAL_FEATURES = 3 SUPPORTED_FEATURES_HEX = "4" def return_negotiated_supp_feat_dict(supp_feat): final_supp_feat = bin(int(supp_feat, 16) & int(SUPPORTED_FEATURES_HEX, 16))[2:].zfill(TOTAL_FEATURES)[::-1] return { "Notification_test_event": True if final_supp_feat[0] == "1" else False, "Notification_websocket": True if final_supp_feat[1] == "1" else False, "SecurityInfoPerAPI": True if final_supp_feat[2] == "1" else False, "Final": hex(int(final_supp_feat[::-1], 2))[2:] } class SecurityOperations(Resource): def __check_invoker(self, api_invoker_id): Loading Loading @@ -252,6 +264,9 @@ class SecurityOperations(Resource): "Already security context defined with same api invoker id") return forbidden_error(detail="Security method already defined", cause="Identical AEF Profile IDs") negotiated = return_negotiated_supp_feat_dict(service_security.supported_features) service_security.supported_features = negotiated["Final"] for service_instance in service_security.security_info: if service_instance.interface_details is not None: Loading Loading @@ -487,6 +502,9 @@ class SecurityOperations(Resource): mycol = self.db.get_col_by_name(self.db.security_info) try: negotiated_supported_features = return_negotiated_supp_feat_dict(service_security.supported_features) service_security.supported_features = negotiated_supported_features["Final"] current_app.logger.debug("Updating security context") result = self.__check_invoker(api_invoker_id) if result != None: Loading