Skip to content
Snippets Groups Projects
capif_provider_connector.py 50.3 KiB
Newer Older
JorgeEcheva26's avatar
JorgeEcheva26 committed

        except requests.exceptions.RequestException as e:
            self.logger.error(
                f"Error offboarding Provider: {e} - Response: {response.text}")
            raise
        except Exception as e:
            self.logger.error(
                f"Unexpected error: {e} - Response: {response.text}")
            raise

    def __remove_files(self):
        self.logger.info("Removing files generated")
        try:
            folder_path = self.provider_folder

            if os.path.exists(folder_path):
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                # Deletes all content within the folder, including files and subfolders
JorgeEcheva26's avatar
JorgeEcheva26 committed
                for root, dirs, files in os.walk(folder_path):
                    for file in files:
                        os.remove(os.path.join(root, file))
                    for dir in dirs:
                        shutil.rmtree(os.path.join(root, dir))
                os.rmdir(folder_path)
                self.logger.info(
                    f"All contents in {folder_path} removed successfully.")
            else:
                self.logger.warning(f"Folder {folder_path} does not exist.")
        except Exception as e:
            self.logger.error(f"Error during removing folder contents: {e}")
            raise

    def __load_provider_api_details(self) -> dict:
        """
        Loads NEF API details from the CAPIF provider details JSON file.

        :return: A dictionary containing NEF API details.
        :raises FileNotFoundError: If the CAPIF provider details file is not found.
        :raises json.JSONDecodeError: If there is an error decoding the JSON file.
        """
        file_path = os.path.join(self.provider_folder,
                                 "capif_provider_details.json")

        try:
            with open(file_path, "r") as file:
                return json.load(file)
        except FileNotFoundError:
            self.logger.error(f"File not found: {file_path}")
            raise
        except json.JSONDecodeError as e:
            self.logger.error(
                f"Error decoding JSON from file {file_path}: {e}")
            raise
        except Exception as e:
            self.logger.error(
                f"Unexpected error while loading NEF API details: {e}")
            raise

    def update_provider(self):
        self.certs_modifications()

        capif_postauth_info = self.__save_capif_ca_root_file_and_get_auth_token()
        capif_onboarding_url = capif_postauth_info["ccf_api_onboarding_url"]
        access_token = capif_postauth_info["access_token"]
        ccf_publish_url = capif_postauth_info["ccf_publish_url"]

        onboarding_response = self.update_onboard(
            capif_onboarding_url, access_token)

        capif_registration_id = onboarding_response["apiProvDomId"]
        self.__write_to_file(
            onboarding_response, capif_registration_id, ccf_publish_url
        )

    def certs_modifications(self):
        api_details = self.__load_provider_api_details()

        apf_count = 0
        aef_count = 0

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Iterate over the dictionary keys (the fields of the JSON)

JorgeEcheva26's avatar
JorgeEcheva26 committed
        for key in api_details.keys():
            if key.startswith("APF"):
                apf_count += 1
            elif key.startswith("AEF"):
                aef_count += 1

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Log the results (or return them, depending on what you need)
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.logger.info(f"Total APFs: {apf_count}, Total AEFs: {aef_count}")
        APFscertstoremove = 0
        APFscertstoadd = 0
        AEFscertstoremove = 0
        AEFscertstoadd = 0
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Calculate the difference of APFs
JorgeEcheva26's avatar
JorgeEcheva26 committed
        if apf_count != self.apfs:
            diff = apf_count - self.apfs
            if diff < 0:
                self.APFscertstoadd = abs(diff)
            else:
                APFscertstoremove = diff
        else:
            APFscertstoremove = 0
            APFscertstoadd = 0

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Calculate the difference of AEFs
JorgeEcheva26's avatar
JorgeEcheva26 committed
        if aef_count != self.aefs:
            diff = aef_count - self.aefs
            if diff < 0:
                self.AEFscertstoadd = abs(diff)
            else:
                AEFscertstoremove = diff
        else:
            AEFscertstoremove = 0
            AEFscertstoadd = 0

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Remove APF files in descending order if there are more APFs than there should be
JorgeEcheva26's avatar
JorgeEcheva26 committed
        if APFscertstoremove:
            while apf_count > self.apfs:
                # List files starting with "APF-" or "apf-" in the directory
                file_path = os.path.join(
                    self.provider_folder, f"APF-{apf_count}_private_key.key")
                os.remove(file_path)
                self.logger.info(
                    f"Removed APF file: APF-{apf_count}_private_key.key")

                file_path = os.path.join(
                    self.provider_folder, f"APF-{apf_count}_public.csr")
                os.remove(file_path)
                self.logger.info(
                    f"Removed APF file: APF-{apf_count}_public.csr")

                file_path = os.path.join(
                    self.provider_folder, f"apf-{apf_count}.crt")
                os.remove(file_path)
                self.logger.info(f"Removed APF file: apf-{apf_count}.crt")
                # Decrease the APF count
                apf_count -= 1

        # Remove AEF files in descending order if there are more AEFs than there should be
        if AEFscertstoremove:
            while aef_count > self.aefs:
                # List files starting with "AEF-" or "aef-" in the directory
                file_path = os.path.join(
                    self.provider_folder, f"AEF-{aef_count}_private_key.key")
                os.remove(file_path)
                self.logger.info(
                    f"Removed AEF file: AEF-{aef_count}_private_key.key")

                file_path = os.path.join(
                    self.provider_folder, f"AEF-{aef_count}_public.csr")
                os.remove(file_path)
                self.logger.info(
                    f"Removed AEF file: AEF-{aef_count}_public.csr")

                file_path = os.path.join(
                    self.provider_folder, f"aef-{aef_count}.crt")
                os.remove(file_path)
                self.logger.info(f"Removed AEF file: aef-{aef_count}.crt")
                # Decrease the APF count
                aef_count -= 1

    def update_onboard(self, capif_onboarding_url, access_token):
        self.logger.info(
            "Onboarding Provider to CAPIF and waiting signed certificate by giving our public keys to CAPIF")
        api_details = self.__load_provider_api_details()
        capif_id = "/" + api_details["capif_registration_id"]

        url = f"{self.capif_https_url}{capif_onboarding_url}{capif_id}"
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
        }

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Create the list of unindexed roles
JorgeEcheva26's avatar
JorgeEcheva26 committed
        roles = ["AMF"]
        for n in range(1, self.aefs + 1):
            roles.append("AEF")

        for n in range(1, self.apfs + 1):
            roles.append("APF")

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Build the payload with unindexed roles
JorgeEcheva26's avatar
JorgeEcheva26 committed
        payload = {
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            "api_prov_funcs": [
JorgeEcheva26's avatar
JorgeEcheva26 committed
                {"regInfo": {"apiProvPubKey": ""}, "apiProvFuncRole": role,
                    "apiProvFuncInfo": f"{role.lower()}"}
                for role in roles
            ],
            "apiProvDomInfo": "This is provider",
            "suppFeat": "fff",
            "failReason": "string",
            "regSec": access_token,
        }

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Generate the indexed roles for certificate creation
JorgeEcheva26's avatar
JorgeEcheva26 committed
        indexed_roles = ["AMF"] + [f"AEF-{n}" for n in range(1, self.aefs + 1)] + [
            f"APF-{n}" for n in range(1, self.apfs + 1)]

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Iterate over each API provider function
        for i, api_func in enumerate(payload["api_prov_funcs"]):
            # Folder path for providers
JorgeEcheva26's avatar
JorgeEcheva26 committed
            folder_path = self.provider_folder

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Check if the folder exists
JorgeEcheva26's avatar
JorgeEcheva26 committed
            if os.path.exists(folder_path):
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                found_key = False  # Variable to control if a public key has already been found
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                # Iterate over the files in the folder
JorgeEcheva26's avatar
JorgeEcheva26 committed
                for root, dirs, files in os.walk(folder_path):
                    for file_name in files:
                        if file_name.endswith(".csr"):
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                            # Check if the file starts with the expected role
JorgeEcheva26's avatar
JorgeEcheva26 committed
                            role_prefix = indexed_roles[i]
                            if any(file_name.startswith(prefix) and role_prefix == prefix for prefix in [f"APF-{i+1}", f"AEF-{i+1}", "AMF"]):
                                file_path = os.path.join(root, file_name)

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                                # Read the public key from the file
JorgeEcheva26's avatar
JorgeEcheva26 committed
                                with open(file_path, "r") as csr_file:
                                    api_func["regInfo"]["apiProvPubKey"] = csr_file.read(
                                    )

                                found_key = True
                                break

                    if found_key:
                        break

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                # If a file with the public key is not found, generate a new key
JorgeEcheva26's avatar
JorgeEcheva26 committed
                if not found_key:

                    public_key = self.__create_private_and_public_keys(
                        indexed_roles[i])
                    api_func["regInfo"]["apiProvPubKey"] = public_key.decode(
                        "utf-8")

        cert = (
            os.path.join(self.provider_folder, "amf.crt"),
            os.path.join(self.provider_folder, "AMF_private_key.key"),
        )

        try:
            response = requests.put(
                url,
                headers=headers,
                data=json.dumps(payload),
                cert=cert,
                verify=os.path.join(self.provider_folder, "ca.crt"),
            )

            response.raise_for_status()
            self.logger.info(
                "Provider onboarded and signed certificate obtained successfully")
            return response.json()
        except requests.exceptions.RequestException as e:
            self.logger.error(
                f"Onboarding failed: {e} - Response: {response.text}")
            raise