diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..d99119780fb8ea0b49729f7fd11b37ecb0c1161e Binary files /dev/null and b/.DS_Store differ diff --git a/sdk/sdk.py b/sdk/sdk.py index 7f6983534590e3f8d6232bad4337b6ac7e33c444..d98df5baf598a6cf93fb7f22e791d441bd4301b0 100644 --- a/sdk/sdk.py +++ b/sdk/sdk.py @@ -366,6 +366,90 @@ class CAPIFInvokerConnector: self.logger.error(f"Error during writing to file: {e}") raise + def update_Invoker(self): + self.logger.info("Updating Invoker") + try: + + capif_postauth_info = self.__save_capif_ca_root_file_and_get_auth_token() + capif_onboarding_url = capif_postauth_info["ccf_onboarding_url"] + capif_access_token = capif_postauth_info["access_token"] + path=self.invoker_folder + "/cert_req.csr" + with open(path, "rb") as file: + public_key = file.read() + + self.__update_invoker_to_capif_and_create_the_signed_certificate( + public_key, capif_onboarding_url, capif_access_token + ) + + self.logger.info("Invoker updated successfully") + except Exception as e: + self.logger.error(f"Error during Invoker updating Invoker: {e}") + raise + + def __update_invoker_to_capif_and_create_the_signed_certificate( + self, public_key, capif_onboarding_url, capif_access_token + ): + self.logger.info("Updating Invoker to CAPIF and creating signed certificate by giving our public key to CAPIF") + try: + path = self.invoker_folder + "/" + self.capif_api_details_filename + + + with open(path, "r") as file: + invoker_details = file.read() + + + invoker_details = json.loads(invoker_details) + + + invoker_id = invoker_details["api_invoker_id"] + url = self.capif_https_url + capif_onboarding_url + "/" + invoker_id + payload_dict = { + "notificationDestination": self.capif_callback_url, + "supportedFeatures": "fffffff", + "apiInvokerInformation": self.csr_common_name, + "websockNotifConfig": { + "requestWebsocketUri": True, + "websocketUri": "websocketUri", + }, + "onboardingInformation": {"apiInvokerPublicKey": str(public_key, "utf-8")}, + "requestTestNotification": True, + } + payload = json.dumps(payload_dict) + headers = { + "Authorization": "Bearer {}".format(capif_access_token), + "Content-Type": "application/json", + } + signed_key_crt_path = os.path.join( + self.invoker_folder, + self.capif_invoker_username + ".crt" + ) + + private_key_path = os.path.join( + self.invoker_folder, + "private.key" + ) + pathca = os.path.join(self.invoker_folder,"ca.crt") + response = requests.request( + "PUT", + url, + headers=headers, + data=payload, + cert=(signed_key_crt_path, private_key_path), + verify=pathca, + ) + + response.raise_for_status() + + + self.logger.info("Invoker updated and signed certificate updated successfully") + + except Exception as e: + self.logger.error(f"Error during updating Invoker to CAPIF: {e}") + raise + + + + class CAPIFProviderConnector: """ Τhis class is responsible for onboarding an exposer (eg. NEF emulator) to CAPIF @@ -1178,6 +1262,21 @@ class CAPIFProviderConnector: file_name = capif_response_json.get("apiName", "default_name") # Default name if apiName is missing id=capif_response_json.get("apiId","default_id") + directory = self.provider_folder + + # Iterar sobre todos los archivos en el directorio + for filename in os.listdir(directory): + path = os.path.join(directory, filename) + + # Verificar si el archivo empieza con 'CAPIF-' + + if filename.startswith("CAPIF-") and id in filename: + + os.remove(path) # Salir del bucle si el archivo es eliminado + break + + + output_path = os.path.join(self.provider_folder, f"CAPIF-{file_name}-{id}-api.json") @@ -1192,7 +1291,10 @@ class CAPIFProviderConnector: if os.path.exists(output_path): with open(output_path, "r") as outfile: published_apis = json.load(outfile) - + + keys_to_remove = [key for key, value in published_apis.items() if value == id] + for key in keys_to_remove: + del published_apis[key] # Agregar el nuevo API publicado published_apis[file_name] = id @@ -1297,7 +1399,207 @@ class CAPIFProviderConnector: self.logger.error(f"Unexpected error while loading NEF API details: {e}") raise + def update_provider(self): + self.certs_modifications() + + capif_postauth_info = self.__save_capif_ca_root_file_and_get_auth_token() + capif_onboarding_url = capif_postauth_info["ccf_api_onboarding_url"] + access_token = capif_postauth_info["access_token"] + ccf_publish_url = capif_postauth_info["ccf_publish_url"] + + onboarding_response=self.update_onboard(capif_onboarding_url,access_token) + + capif_registration_id = onboarding_response["apiProvDomId"] + self.__write_to_file( + onboarding_response, capif_registration_id, ccf_publish_url + ) + + def certs_modifications(self): + api_details = self.__load_provider_api_details() + + apf_count = 0 + aef_count = 0 + + # Itera sobre las claves del diccionario (los campos del JSON) + for key in api_details.keys(): + if key.startswith("APF"): + apf_count += 1 + elif key.startswith("AEF"): + aef_count += 1 + + # Loggea los resultados (o los retorna, depende de lo que necesites) + self.logger.info(f"Total APFs: {apf_count}, Total AEFs: {aef_count}") + APFscertstoremove = 0 + APFscertstoadd = 0 + AEFscertstoremove = 0 + AEFscertstoadd = 0 + # Calcula la diferencia de APFs + if apf_count != self.APFs: + diff = apf_count - self.APFs + if diff < 0: + self.APFscertstoadd = abs(diff) + else: + APFscertstoremove = diff + else: + APFscertstoremove = 0 + APFscertstoadd = 0 + + # Calcula la diferencia de AEFs + if aef_count != self.AEFs: + diff = aef_count - self.AEFs + if diff < 0: + self.AEFscertstoadd = abs(diff) + else: + AEFscertstoremove = diff + else: + AEFscertstoremove = 0 + AEFscertstoadd = 0 + + # Eliminar ficheros APF en orden descendente si hay más APFs de los que deberÃa haber + if APFscertstoremove: + while apf_count > self.APFs: + # List files starting with "APF-" or "apf-" in the directory + file_path = os.path.join(self.provider_folder, f"APF-{apf_count}_private_key.key") + os.remove(file_path) + self.logger.info(f"Removed APF file: APF-{apf_count}_private_key.key") + + file_path = os.path.join(self.provider_folder, f"APF-{apf_count}_public.csr") + os.remove(file_path) + self.logger.info(f"Removed APF file: APF-{apf_count}_public.csr") + + + file_path = os.path.join(self.provider_folder, f"apf-{apf_count}.crt") + os.remove(file_path) + self.logger.info(f"Removed APF file: apf-{apf_count}.crt") + # Decrease the APF count + apf_count -= 1 + + + + + + + + # Remove AEF files in descending order if there are more AEFs than there should be + if AEFscertstoremove: + while aef_count > self.AEFs: + # List files starting with "AEF-" or "aef-" in the directory + file_path = os.path.join(self.provider_folder, f"AEF-{aef_count}_private_key.key") + os.remove(file_path) + self.logger.info(f"Removed AEF file: AEF-{aef_count}_private_key.key") + + file_path = os.path.join(self.provider_folder, f"AEF-{aef_count}_public.csr") + os.remove(file_path) + self.logger.info(f"Removed AEF file: AEF-{aef_count}_public.csr") + + + file_path = os.path.join(self.provider_folder, f"aef-{aef_count}.crt") + os.remove(file_path) + self.logger.info(f"Removed AEF file: aef-{aef_count}.crt") + # Decrease the APF count + aef_count -= 1 + + + + def update_onboard(self,capif_onboarding_url,access_token): + self.logger.info("Onboarding Provider to CAPIF and waiting signed certificate by giving our public keys to CAPIF") + api_details=self.__load_provider_api_details() + capif_id="/" + api_details["capif_registration_id"] + + url = f"{self.capif_https_url}{capif_onboarding_url}{capif_id}" + headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", + } + + # Crear la lista de roles sin indexar + roles = ["AMF"] + for n in range(1, self.AEFs + 1): + roles.append("AEF") + + for n in range(1, self.APFs + 1): + roles.append("APF") + + # Construir el payload con los roles sin indexar + payload = { + "apiProvFuncs": [ + {"regInfo": {"apiProvPubKey": ""}, "apiProvFuncRole": role, "apiProvFuncInfo": f"{role.lower()}"} + for role in roles + ], + "apiProvDomInfo": "This is provider", + "suppFeat": "fff", + "failReason": "string", + "regSec": access_token, + } + + # Generar los roles indexados para la creación de certificados + indexed_roles = ["AMF"] + [f"AEF-{n}" for n in range(1, self.AEFs + 1)] + [f"APF-{n}" for n in range(1, self.APFs + 1)] + + # Recorrer cada función de proveedor de API + for i, api_func in enumerate(payload["apiProvFuncs"]): + # Ruta de la carpeta de proveedores + folder_path = self.provider_folder + + # Verificar si la carpeta existe + if os.path.exists(folder_path): + found_key = False # Variable para controlar si ya se encontró una clave pública + + # Recorrer los archivos en la carpeta + for root, dirs, files in os.walk(folder_path): + for file_name in files: + if file_name.endswith(".csr"): + # Verificar si el archivo comienza con el rol esperado + role_prefix = indexed_roles[i] + if any(file_name.startswith(prefix) and role_prefix==prefix for prefix in [f"APF-{i+1}", f"AEF-{i+1}", "AMF"]): + file_path = os.path.join(root, file_name) + print(file_name) + # Leer la clave pública del archivo + with open(file_path, "r") as csr_file: + api_func["regInfo"]["apiProvPubKey"] = csr_file.read() + + found_key = True + break + + if found_key: + break + + # Si no se encuentra un archivo con la clave pública, generar una nueva clave + if not found_key: + print(indexed_roles[i]) + public_key = self.__create_private_and_public_keys(indexed_roles[i]) + api_func["regInfo"]["apiProvPubKey"] = public_key.decode("utf-8") + + cert = ( + os.path.join(self.provider_folder, f"amf.crt"), + os.path.join(self.provider_folder, f"AMF_private_key.key"), + ) + + + + + + + try: + response = requests.put( + url, + headers=headers, + data=json.dumps(payload), + cert=cert, + verify=os.path.join(self.provider_folder, "ca.crt"), + ) + + response.raise_for_status() + self.logger.info("Provider onboarded and signed certificate obtained successfully") + return response.json() + except requests.exceptions.RequestException as e: + self.logger.error(f"Onboarding failed: {e}") + raise + + + + + class ServiceDiscoverer: @@ -1694,7 +1996,6 @@ class ServiceDiscoverer: self.capif_api_details["registered_security_contexes"].append({"api_name":api_name,"api_id": api_id, "aef_id": aef_id}) self.save_api_details() - import json def save_api_details(self): try: