Skip to content
CAPIFProviderConnector.py 49.5 KiB
Newer Older
JorgeEcheva26's avatar
JorgeEcheva26 committed
                f"Unexpected error: {e} - Response: {response.text}")
            raise

    def __remove_files(self):
        self.logger.info("Removing files generated")
        try:
            folder_path = self.provider_folder

            if os.path.exists(folder_path):
                # Elimina todo el contenido dentro de la carpeta, incluyendo archivos y subcarpetas
                for root, dirs, files in os.walk(folder_path):
                    for file in files:
                        os.remove(os.path.join(root, file))
                    for dir in dirs:
                        shutil.rmtree(os.path.join(root, dir))
                os.rmdir(folder_path)
                self.logger.info(
                    f"All contents in {folder_path} removed successfully.")
            else:
                self.logger.warning(f"Folder {folder_path} does not exist.")
        except Exception as e:
            self.logger.error(f"Error during removing folder contents: {e}")
            raise

    def __load_provider_api_details(self) -> dict:
        """
        Loads NEF API details from the CAPIF provider details JSON file.

        :return: A dictionary containing NEF API details.
        :raises FileNotFoundError: If the CAPIF provider details file is not found.
        :raises json.JSONDecodeError: If there is an error decoding the JSON file.
        """
        file_path = os.path.join(self.provider_folder,
                                 "capif_provider_details.json")

        try:
            with open(file_path, "r") as file:
                return json.load(file)
        except FileNotFoundError:
            self.logger.error(f"File not found: {file_path}")
            raise
        except json.JSONDecodeError as e:
            self.logger.error(
                f"Error decoding JSON from file {file_path}: {e}")
            raise
        except Exception as e:
            self.logger.error(
                f"Unexpected error while loading NEF API details: {e}")
            raise

    def update_provider(self):
        self.certs_modifications()

        capif_postauth_info = self.__save_capif_ca_root_file_and_get_auth_token()
        capif_onboarding_url = capif_postauth_info["ccf_api_onboarding_url"]
        access_token = capif_postauth_info["access_token"]
        ccf_publish_url = capif_postauth_info["ccf_publish_url"]

        onboarding_response = self.update_onboard(
            capif_onboarding_url, access_token)

        capif_registration_id = onboarding_response["apiProvDomId"]
        self.__write_to_file(
            onboarding_response, capif_registration_id, ccf_publish_url
        )

    def certs_modifications(self):
        api_details = self.__load_provider_api_details()

        apf_count = 0
        aef_count = 0

        # Itera sobre las claves del diccionario (los campos del JSON)
        for key in api_details.keys():
            if key.startswith("APF"):
                apf_count += 1
            elif key.startswith("AEF"):
                aef_count += 1

        # Loggea los resultados (o los retorna, depende de lo que necesites)
        self.logger.info(f"Total APFs: {apf_count}, Total AEFs: {aef_count}")
        APFscertstoremove = 0
        APFscertstoadd = 0
        AEFscertstoremove = 0
        AEFscertstoadd = 0
        # Calcula la diferencia de APFs
        if apf_count != self.apfs:
            diff = apf_count - self.apfs
            if diff < 0:
                self.APFscertstoadd = abs(diff)
            else:
                APFscertstoremove = diff
        else:
            APFscertstoremove = 0
            APFscertstoadd = 0

        # Calcula la diferencia de AEFs
        if aef_count != self.aefs:
            diff = aef_count - self.aefs
            if diff < 0:
                self.AEFscertstoadd = abs(diff)
            else:
                AEFscertstoremove = diff
        else:
            AEFscertstoremove = 0
            AEFscertstoadd = 0

        # Eliminar ficheros APF en orden descendente si hay más APFs de los que debería haber
        if APFscertstoremove:
            while apf_count > self.apfs:
                # List files starting with "APF-" or "apf-" in the directory
                file_path = os.path.join(
                    self.provider_folder, f"APF-{apf_count}_private_key.key")
                os.remove(file_path)
                self.logger.info(
                    f"Removed APF file: APF-{apf_count}_private_key.key")

                file_path = os.path.join(
                    self.provider_folder, f"APF-{apf_count}_public.csr")
                os.remove(file_path)
                self.logger.info(
                    f"Removed APF file: APF-{apf_count}_public.csr")

                file_path = os.path.join(
                    self.provider_folder, f"apf-{apf_count}.crt")
                os.remove(file_path)
                self.logger.info(f"Removed APF file: apf-{apf_count}.crt")
                # Decrease the APF count
                apf_count -= 1

        # Remove AEF files in descending order if there are more AEFs than there should be
        if AEFscertstoremove:
            while aef_count > self.aefs:
                # List files starting with "AEF-" or "aef-" in the directory
                file_path = os.path.join(
                    self.provider_folder, f"AEF-{aef_count}_private_key.key")
                os.remove(file_path)
                self.logger.info(
                    f"Removed AEF file: AEF-{aef_count}_private_key.key")

                file_path = os.path.join(
                    self.provider_folder, f"AEF-{aef_count}_public.csr")
                os.remove(file_path)
                self.logger.info(
                    f"Removed AEF file: AEF-{aef_count}_public.csr")

                file_path = os.path.join(
                    self.provider_folder, f"aef-{aef_count}.crt")
                os.remove(file_path)
                self.logger.info(f"Removed AEF file: aef-{aef_count}.crt")
                # Decrease the APF count
                aef_count -= 1

    def update_onboard(self, capif_onboarding_url, access_token):
        self.logger.info(
            "Onboarding Provider to CAPIF and waiting signed certificate by giving our public keys to CAPIF")
        api_details = self.__load_provider_api_details()
        capif_id = "/" + api_details["capif_registration_id"]

        url = f"{self.capif_https_url}{capif_onboarding_url}{capif_id}"
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
        }

        # Crear la lista de roles sin indexar
        roles = ["AMF"]
        for n in range(1, self.aefs + 1):
            roles.append("AEF")

        for n in range(1, self.apfs + 1):
            roles.append("APF")

        # Construir el payload con los roles sin indexar
        payload = {
            "apiProvFuncs": [
                {"regInfo": {"apiProvPubKey": ""}, "apiProvFuncRole": role,
                    "apiProvFuncInfo": f"{role.lower()}"}
                for role in roles
            ],
            "apiProvDomInfo": "This is provider",
            "suppFeat": "fff",
            "failReason": "string",
            "regSec": access_token,
        }

        # Generar los roles indexados para la creación de certificados
        indexed_roles = ["AMF"] + [f"AEF-{n}" for n in range(1, self.aefs + 1)] + [
            f"APF-{n}" for n in range(1, self.apfs + 1)]

        # Recorrer cada función de proveedor de API
        for i, api_func in enumerate(payload["apiProvFuncs"]):
            # Ruta de la carpeta de proveedores
            folder_path = self.provider_folder

            # Verificar si la carpeta existe
            if os.path.exists(folder_path):
                found_key = False  # Variable para controlar si ya se encontró una clave pública

                # Recorrer los archivos en la carpeta
                for root, dirs, files in os.walk(folder_path):
                    for file_name in files:
                        if file_name.endswith(".csr"):
                            # Verificar si el archivo comienza con el rol esperado
                            role_prefix = indexed_roles[i]
                            if any(file_name.startswith(prefix) and role_prefix == prefix for prefix in [f"APF-{i+1}", f"AEF-{i+1}", "AMF"]):
                                file_path = os.path.join(root, file_name)

                                # Leer la clave pública del archivo
                                with open(file_path, "r") as csr_file:
                                    api_func["regInfo"]["apiProvPubKey"] = csr_file.read(
                                    )

                                found_key = True
                                break

                    if found_key:
                        break

                # Si no se encuentra un archivo con la clave pública, generar una nueva clave
                if not found_key:

                    public_key = self.__create_private_and_public_keys(
                        indexed_roles[i])
                    api_func["regInfo"]["apiProvPubKey"] = public_key.decode(
                        "utf-8")

        cert = (
            os.path.join(self.provider_folder, "amf.crt"),
            os.path.join(self.provider_folder, "AMF_private_key.key"),
        )

        try:
            response = requests.put(
                url,
                headers=headers,
                data=json.dumps(payload),
                cert=cert,
                verify=os.path.join(self.provider_folder, "ca.crt"),
            )

            response.raise_for_status()
            self.logger.info(
                "Provider onboarded and signed certificate obtained successfully")
            return response.json()
        except requests.exceptions.RequestException as e:
            self.logger.error(
                f"Onboarding failed: {e} - Response: {response.text}")
            raise