Loading opencapif_sdk/capif_provider_connector.py +56 −46 Original line number Diff line number Diff line Loading @@ -466,40 +466,44 @@ class capif_provider_connector: "Mismatch between number of AEFs and profiles") # Assigning each AEF for profile, aef_id in zip(data.get("aefProfiles", []), AEFs_list): profile["aefId"] = aef_id for versions in profile["versions"]: check = True revoke = True for custom in versions["custOperations"]: if custom["custOpName"] == "check-authentication": check = False if custom["custOpName"] == "revoke-authentication": revoke = False # Si ambas condiciones ya son falsas, salir del bucle if not check and not revoke: break # If 'check-authentication' custom operation doesn't exist, add it if check: versions["custOperations"].append({ if not isinstance(profile, dict): # Verificar que profile sea un diccionario raise TypeError(f"Expected profile to be a dict, got {type(profile).__name__}") profile["aefId"] = aef_id # Asignar el ID de AEF versions = profile.get("versions") # Obtener versions i = 1 for version in versions: # Iterar sobre cada versión if not isinstance(version, dict): # Verificar que cada versión sea un diccionario raise TypeError(f"Expected each version to be a dict, got {type(version).__name__}") # Obtener nombres existentes de operaciones personalizadas existing_operations = { op["custOpName"].strip() for op in version.get("custOperations", []) if isinstance(op, dict) } # Verificar y agregar `check-authentication` si no existe if "check-authentication" not in existing_operations and i == 1: version.setdefault("custOperations", []).append({ "commType": "REQUEST_RESPONSE", "custOpName": "check-authentication", "operations": [ "POST" ], "operations": ["POST"], "description": "Check authentication request." }) # If 'revoke-authentication' custom operation doesn't exist, add it if revoke: versions["custOperations"].append({ # Verificar y agregar `revoke-authentication` si no existe if "revoke-authentication" not in existing_operations and i == 1: version.setdefault("custOperations", []).append({ "commType": "REQUEST_RESPONSE", "custOpName": "revoke-authentication", "operations": [ "POST" ], "operations": ["POST"], "description": "Revoke authorization for service APIs." }) i -= 1 self.logger.info( "Service API description modified successfully") Loading Loading @@ -907,36 +911,42 @@ class capif_provider_connector: # Asing the chosen AEFs for profile, aef_id in zip(data.get("aefProfiles", []), AEFs_list): profile["aefId"] = aef_id for versions in profile["versions"]: for custom in versions["custOperations"]: check = True revoke = True if custom["custOpName"] == "check-authentication": check = False if custom["custOpName"] == "revoke-authentication ": revoke = False # If 'check-authentication' custom operation doesn't exist, add it if check: versions["custOperations"].append({ if not isinstance(profile, dict): # Verificar que profile sea un diccionario raise TypeError(f"Expected profile to be a dict, got {type(profile).__name__}") profile["aefId"] = aef_id # Asignar el ID de AEF versions = profile.get("versions") # Obtener versions i = 1 for version in versions: # Iterar sobre cada versión if not isinstance(version, dict): # Verificar que cada versión sea un diccionario raise TypeError(f"Expected each version to be a dict, got {type(version).__name__}") # Obtener nombres existentes de operaciones personalizadas existing_operations = { op["custOpName"].strip() for op in version.get("custOperations", []) if isinstance(op, dict) } # Verificar y agregar `check-authentication` si no existe if "check-authentication" not in existing_operations and i == 1: version.setdefault("custOperations", []).append({ "commType": "REQUEST_RESPONSE", "custOpName": "check-authentication", "operations": [ "POST" ], "operations": ["POST"], "description": "Check authentication request." }) # If 'revoke-authentication' custom operation doesn't exist, add it if revoke: versions["custOperations"].append({ # Verificar y agregar `revoke-authentication` si no existe if "revoke-authentication" not in existing_operations and i == 1: version.setdefault("custOperations", []).append({ "commType": "REQUEST_RESPONSE", "custOpName": "revoke-authentication", "operations": [ "POST" ], "operations": ["POST"], "description": "Revoke authorization for service APIs." }) i -= 1 self.logger.info( "Service API description modified successfully") Loading opencapif_sdk/service_discoverer.py +0 −4 Original line number Diff line number Diff line Loading @@ -296,7 +296,6 @@ class service_discoverer: payload["securityInfo"].append(security_info) try: print(payload) response = requests.put(url, json=payload, cert=(self.signed_key_crt_path, Loading Loading @@ -507,7 +506,6 @@ class service_discoverer: raise def check_authentication(self): print("hola") self.logger.info("Checking authentication") try: invoker_details = self.__load_provider_api_details() Loading @@ -519,7 +517,6 @@ class service_discoverer: "apiInvokerId": f"{invoker_id}", "supportedFeatures": f"{self.supported_features}" } print(self.supported_features) headers = { "Authorization": "Bearer {}".format(self.token), Loading @@ -533,7 +530,6 @@ class service_discoverer: json=payload ) print(response.text) response.raise_for_status() self.logger.info("Authentication of supported_features checked") Loading test/network_app_provider_api_spec.json +4 −4 Original line number Diff line number Diff line Loading @@ -47,7 +47,7 @@ "protocol": "HTTP_1_1", "dataFormat": "JSON", "securityMethods": [ "Oauth", "OAUTH", "PSK" ], "interfaceDescriptions": [ Loading @@ -55,7 +55,7 @@ "ipv4Addr": "127.0.0.1", "port": 8888, "securityMethods": [ "Oauth" "OAUTH" ] } ] Loading Loading @@ -123,14 +123,14 @@ "protocol": "HTTP_1_1", "dataFormat": "JSON", "securityMethods": [ "Oauth" "OAUTH" ], "interfaceDescriptions": [ { "ipv4Addr": "127.0.0.1", "port": 8899, "securityMethods": [ "Oauth" "OAUTH" ] } ] Loading test/test.py +2 −0 Original line number Diff line number Diff line Loading @@ -120,6 +120,8 @@ if __name__ == "__main__": discoverer = service_discoverer(config_file=capif_sdk_config_path) discoverer.discover_filter["api-name"]= "Testtrece" discoverer.discover() print("SERVICE DISCOVER COMPLETED") Loading Loading
opencapif_sdk/capif_provider_connector.py +56 −46 Original line number Diff line number Diff line Loading @@ -466,40 +466,44 @@ class capif_provider_connector: "Mismatch between number of AEFs and profiles") # Assigning each AEF for profile, aef_id in zip(data.get("aefProfiles", []), AEFs_list): profile["aefId"] = aef_id for versions in profile["versions"]: check = True revoke = True for custom in versions["custOperations"]: if custom["custOpName"] == "check-authentication": check = False if custom["custOpName"] == "revoke-authentication": revoke = False # Si ambas condiciones ya son falsas, salir del bucle if not check and not revoke: break # If 'check-authentication' custom operation doesn't exist, add it if check: versions["custOperations"].append({ if not isinstance(profile, dict): # Verificar que profile sea un diccionario raise TypeError(f"Expected profile to be a dict, got {type(profile).__name__}") profile["aefId"] = aef_id # Asignar el ID de AEF versions = profile.get("versions") # Obtener versions i = 1 for version in versions: # Iterar sobre cada versión if not isinstance(version, dict): # Verificar que cada versión sea un diccionario raise TypeError(f"Expected each version to be a dict, got {type(version).__name__}") # Obtener nombres existentes de operaciones personalizadas existing_operations = { op["custOpName"].strip() for op in version.get("custOperations", []) if isinstance(op, dict) } # Verificar y agregar `check-authentication` si no existe if "check-authentication" not in existing_operations and i == 1: version.setdefault("custOperations", []).append({ "commType": "REQUEST_RESPONSE", "custOpName": "check-authentication", "operations": [ "POST" ], "operations": ["POST"], "description": "Check authentication request." }) # If 'revoke-authentication' custom operation doesn't exist, add it if revoke: versions["custOperations"].append({ # Verificar y agregar `revoke-authentication` si no existe if "revoke-authentication" not in existing_operations and i == 1: version.setdefault("custOperations", []).append({ "commType": "REQUEST_RESPONSE", "custOpName": "revoke-authentication", "operations": [ "POST" ], "operations": ["POST"], "description": "Revoke authorization for service APIs." }) i -= 1 self.logger.info( "Service API description modified successfully") Loading Loading @@ -907,36 +911,42 @@ class capif_provider_connector: # Asing the chosen AEFs for profile, aef_id in zip(data.get("aefProfiles", []), AEFs_list): profile["aefId"] = aef_id for versions in profile["versions"]: for custom in versions["custOperations"]: check = True revoke = True if custom["custOpName"] == "check-authentication": check = False if custom["custOpName"] == "revoke-authentication ": revoke = False # If 'check-authentication' custom operation doesn't exist, add it if check: versions["custOperations"].append({ if not isinstance(profile, dict): # Verificar que profile sea un diccionario raise TypeError(f"Expected profile to be a dict, got {type(profile).__name__}") profile["aefId"] = aef_id # Asignar el ID de AEF versions = profile.get("versions") # Obtener versions i = 1 for version in versions: # Iterar sobre cada versión if not isinstance(version, dict): # Verificar que cada versión sea un diccionario raise TypeError(f"Expected each version to be a dict, got {type(version).__name__}") # Obtener nombres existentes de operaciones personalizadas existing_operations = { op["custOpName"].strip() for op in version.get("custOperations", []) if isinstance(op, dict) } # Verificar y agregar `check-authentication` si no existe if "check-authentication" not in existing_operations and i == 1: version.setdefault("custOperations", []).append({ "commType": "REQUEST_RESPONSE", "custOpName": "check-authentication", "operations": [ "POST" ], "operations": ["POST"], "description": "Check authentication request." }) # If 'revoke-authentication' custom operation doesn't exist, add it if revoke: versions["custOperations"].append({ # Verificar y agregar `revoke-authentication` si no existe if "revoke-authentication" not in existing_operations and i == 1: version.setdefault("custOperations", []).append({ "commType": "REQUEST_RESPONSE", "custOpName": "revoke-authentication", "operations": [ "POST" ], "operations": ["POST"], "description": "Revoke authorization for service APIs." }) i -= 1 self.logger.info( "Service API description modified successfully") Loading
opencapif_sdk/service_discoverer.py +0 −4 Original line number Diff line number Diff line Loading @@ -296,7 +296,6 @@ class service_discoverer: payload["securityInfo"].append(security_info) try: print(payload) response = requests.put(url, json=payload, cert=(self.signed_key_crt_path, Loading Loading @@ -507,7 +506,6 @@ class service_discoverer: raise def check_authentication(self): print("hola") self.logger.info("Checking authentication") try: invoker_details = self.__load_provider_api_details() Loading @@ -519,7 +517,6 @@ class service_discoverer: "apiInvokerId": f"{invoker_id}", "supportedFeatures": f"{self.supported_features}" } print(self.supported_features) headers = { "Authorization": "Bearer {}".format(self.token), Loading @@ -533,7 +530,6 @@ class service_discoverer: json=payload ) print(response.text) response.raise_for_status() self.logger.info("Authentication of supported_features checked") Loading
test/network_app_provider_api_spec.json +4 −4 Original line number Diff line number Diff line Loading @@ -47,7 +47,7 @@ "protocol": "HTTP_1_1", "dataFormat": "JSON", "securityMethods": [ "Oauth", "OAUTH", "PSK" ], "interfaceDescriptions": [ Loading @@ -55,7 +55,7 @@ "ipv4Addr": "127.0.0.1", "port": 8888, "securityMethods": [ "Oauth" "OAUTH" ] } ] Loading Loading @@ -123,14 +123,14 @@ "protocol": "HTTP_1_1", "dataFormat": "JSON", "securityMethods": [ "Oauth" "OAUTH" ], "interfaceDescriptions": [ { "ipv4Addr": "127.0.0.1", "port": 8899, "securityMethods": [ "Oauth" "OAUTH" ] } ] Loading
test/test.py +2 −0 Original line number Diff line number Diff line Loading @@ -120,6 +120,8 @@ if __name__ == "__main__": discoverer = service_discoverer(config_file=capif_sdk_config_path) discoverer.discover_filter["api-name"]= "Testtrece" discoverer.discover() print("SERVICE DISCOVER COMPLETED") Loading