Skip to content
Snippets Groups Projects
capif_provider_connector.py 60.7 KiB
Newer Older
JorgeEcheva26's avatar
JorgeEcheva26 committed
                    break

            output_path = os.path.join(
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                self.provider_folder, f"capif_{file_name}_{id}_api.json")
JorgeEcheva26's avatar
JorgeEcheva26 committed

            with open(output_path, "w") as outfile:
                outfile.write(capif_response_text)
            self.logger.info(f"CAPIF response saved to {output_path}")
            output_path = os.path.join(

                self.provider_folder, "provider_service_ids.json")
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Read the existing file of published APIs
            provider_service_ids = {}
JorgeEcheva26's avatar
JorgeEcheva26 committed
            if os.path.exists(output_path):
                with open(output_path, "r") as outfile:
                    provider_service_ids = json.load(outfile)
JorgeEcheva26's avatar
JorgeEcheva26 committed

            keys_to_remove = [key for key,
                              value in provider_service_ids.items() if value == id]
JorgeEcheva26's avatar
JorgeEcheva26 committed
            for key in keys_to_remove:
                del provider_service_ids[key]
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Add the new id of the published API

            provider_service_ids[file_name] = id
            self.provider_service_ids = provider_service_ids

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Update the file with the published APIs
JorgeEcheva26's avatar
JorgeEcheva26 committed
            with open(output_path, "w") as outfile:
                json.dump(provider_service_ids, outfile, indent=4)
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.logger.info(
                f"API '{file_name}' with ID '{id}' added to Published Apis.")
            return json.loads(capif_response_text)
        except requests.RequestException as e:
            self.logger.error(
                f"Request to CAPIF failed: {e} - Response: {response.text}")
            raise
        except Exception as e:
            self.logger.error(
                f"Unexpected error during service publication: {e} - Response: {response.text}")
            raise

    def offboard_provider(self) -> None:
        """
        Offboards and deregisters the NEF (Network Exposure Function).
        """
        try:
            self.offboard_nef()
            self.__remove_files()
            self.logger.info(
                "Provider offboarded and deregistered successfully.")
        except Exception as e:
            self.logger.error(
                f"Failed to offboard and deregister Provider: {e}")
            raise

    def offboard_nef(self) -> None:
        """
        Offboards the NEF (Network Exposure Function) from CAPIF.
        """
        try:
            self.logger.info("Offboarding the provider")

            # Load CAPIF API details
JorgeEcheva26's avatar
JorgeEcheva26 committed
            capif_api_details = self._load_provider_api_details()
JorgeEcheva26's avatar
JorgeEcheva26 committed
            url = f"{self.capif_https_url}api-provider-management/v1/registrations/{capif_api_details['capif_registration_id']}"

            # Define certificate paths
            cert_paths = (
                os.path.join(self.provider_folder, "amf.crt"),
                os.path.join(self.provider_folder, "AMF_private_key.key")
            )

            # Send DELETE request to offboard the provider
            response = requests.delete(
                url,
                cert=cert_paths,
                verify=os.path.join(self.provider_folder, "ca.crt")
            )

            response.raise_for_status()
            self.logger.info("Offboarding performed successfully")

        except requests.exceptions.RequestException as e:
            self.logger.error(
                f"Error offboarding Provider: {e} - Response: {response.text}")
            raise
        except Exception as e:
            self.logger.error(
                f"Unexpected error: {e} - Response: {response.text}")
            raise

    def __remove_files(self):
        self.logger.info("Removing files generated")
        try:
            folder_path = self.provider_folder

            if os.path.exists(folder_path):
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                # Deletes all content within the folder, including files and subfolders
JorgeEcheva26's avatar
JorgeEcheva26 committed
                for root, dirs, files in os.walk(folder_path):
                    for file in files:
                        os.remove(os.path.join(root, file))
                    for dir in dirs:
                        shutil.rmtree(os.path.join(root, dir))
                os.rmdir(folder_path)
                self.logger.info(
                    f"All contents in {folder_path} removed successfully.")
            else:
                self.logger.warning(f"Folder {folder_path} does not exist.")
        except Exception as e:
            self.logger.error(f"Error during removing folder contents: {e}")
            raise

JorgeEcheva26's avatar
JorgeEcheva26 committed
    def _load_provider_api_details(self) -> dict:
JorgeEcheva26's avatar
JorgeEcheva26 committed
        """
        Loads NEF API details from the CAPIF provider details JSON file.

        :return: A dictionary containing NEF API details.
        :raises FileNotFoundError: If the CAPIF provider details file is not found.
        :raises json.JSONDecodeError: If there is an error decoding the JSON file.
        """
        file_path = os.path.join(self.provider_folder,
                                 "provider_capif_ids.json")
JorgeEcheva26's avatar
JorgeEcheva26 committed

        try:
            with open(file_path, "r") as file:
                return json.load(file)
        except FileNotFoundError:
            self.logger.error(f"File not found: {file_path}")
            raise
        except json.JSONDecodeError as e:
            self.logger.error(
                f"Error decoding JSON from file {file_path}: {e}")
            raise
        except Exception as e:
            self.logger.error(
                f"Unexpected error while loading NEF API details: {e}")
            raise

    def update_provider(self, supp_features="0"):
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.certs_modifications()

        capif_postauth_info = self.__save_capif_ca_root_file_and_get_auth_token()
        capif_onboarding_url = capif_postauth_info["ccf_api_onboarding_url"]
        access_token = capif_postauth_info["access_token"]
        ccf_publish_url = capif_postauth_info["ccf_publish_url"]
        onboarding_response = self.update_onboard(
            capif_onboarding_url, access_token, supp_features)
JorgeEcheva26's avatar
JorgeEcheva26 committed
        capif_registration_id = onboarding_response["apiProvDomId"]
        self.__write_to_file(
            onboarding_response, capif_registration_id, ccf_publish_url
        )

    def certs_modifications(self):
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.logger.info("Starting certificate removal process...")
JorgeEcheva26's avatar
JorgeEcheva26 committed
        # List of possible certificate patterns to remove
JorgeEcheva26's avatar
JorgeEcheva26 committed
        cert_patterns = ["APF-", "apf-", "AEF-", "aef-"]
JorgeEcheva26's avatar
JorgeEcheva26 committed
        cert_extensions = ["_private_key.key", "_public.csr", ".crt"]
JorgeEcheva26's avatar
JorgeEcheva26 committed
        # Iterate over the directory and remove matching files
        for file_name in os.listdir(self.provider_folder):
            if any(file_name.startswith(pattern) for pattern in cert_patterns) and any(file_name.endswith(ext) for ext in cert_extensions):
                file_path = os.path.join(self.provider_folder, file_name)
                try:
                    os.remove(file_path)
                    self.logger.info(f"Removed certificate file: {file_name}")
                except Exception as e:
                    self.logger.error(f"Error removing {file_name}: {e}")
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.logger.info("Certificate removal process completed.")
    def update_onboard(self, capif_onboarding_url, access_token, supp_features):
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.logger.info(
            "Onboarding Provider to CAPIF and waiting signed certificate by giving our public keys to CAPIF")
JorgeEcheva26's avatar
JorgeEcheva26 committed
        api_details = self._load_provider_api_details()
JorgeEcheva26's avatar
JorgeEcheva26 committed
        capif_id = "/" + api_details["capif_registration_id"]

        url = f"{self.capif_https_url}{capif_onboarding_url}{capif_id}"
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
        }

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Create the list of unindexed roles
JorgeEcheva26's avatar
JorgeEcheva26 committed
        roles = ["AMF"]
        for n in range(1, self.aefs + 1):
            roles.append("AEF")

        for n in range(1, self.apfs + 1):
            roles.append("APF")

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Build the payload with unindexed roles
JorgeEcheva26's avatar
JorgeEcheva26 committed
        payload = {
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            "apiProvFuncs": [
JorgeEcheva26's avatar
JorgeEcheva26 committed
                {"regInfo": {"apiProvPubKey": ""}, "apiProvFuncRole": role,
                    "apiProvFuncInfo": f"{role.lower()}"}
                for role in roles
            ],
            "apiProvDomInfo": "This is provider",
            "suppFeat": supp_features,
JorgeEcheva26's avatar
JorgeEcheva26 committed
            "failReason": "string",
            "regSec": access_token,
        }

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Generate the indexed roles for certificate creation
JorgeEcheva26's avatar
JorgeEcheva26 committed
        indexed_roles = ["AMF"] + [f"AEF-{n}" for n in range(1, self.aefs + 1)] + [
            f"APF-{n}" for n in range(1, self.apfs + 1)]

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Iterate over each API provider function
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        for i, api_func in enumerate(payload["apiProvFuncs"]):
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Folder path for providers
JorgeEcheva26's avatar
JorgeEcheva26 committed
            folder_path = self.provider_folder

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Check if the folder exists
JorgeEcheva26's avatar
JorgeEcheva26 committed
            if os.path.exists(folder_path):
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                found_key = False  # Variable to control if a public key has already been found
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                # Iterate over the files in the folder
JorgeEcheva26's avatar
JorgeEcheva26 committed
                for root, dirs, files in os.walk(folder_path):
                    for file_name in files:
                        if file_name.endswith(".csr"):
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                            # Check if the file starts with the expected role
JorgeEcheva26's avatar
JorgeEcheva26 committed
                            role_prefix = indexed_roles[i]
                            if any(file_name.startswith(prefix) and role_prefix == prefix for prefix in [f"APF-{i+1}", f"AEF-{i+1}", "AMF"]):
                                file_path = os.path.join(root, file_name)

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                                # Read the public key from the file
JorgeEcheva26's avatar
JorgeEcheva26 committed
                                with open(file_path, "r") as csr_file:
                                    api_func["regInfo"]["apiProvPubKey"] = csr_file.read(
                                    )

                                found_key = True
                                break

                    if found_key:
                        break

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                # If a file with the public key is not found, generate a new key
JorgeEcheva26's avatar
JorgeEcheva26 committed
                if not found_key:

                    public_key = self.__create_private_and_public_keys(
                        indexed_roles[i])
                    api_func["regInfo"]["apiProvPubKey"] = public_key.decode(
                        "utf-8")

        cert = (
            os.path.join(self.provider_folder, "amf.crt"),
            os.path.join(self.provider_folder, "AMF_private_key.key"),
        )

        try:
            response = requests.put(
                url,
                headers=headers,
                data=json.dumps(payload),
                cert=cert,
                verify=os.path.join(self.provider_folder, "ca.crt"),
            )

            response.raise_for_status()
            self.logger.info(
                "Provider onboarded and signed certificate obtained successfully")
            return response.json()
        except requests.exceptions.RequestException as e:
            self.logger.error(
                f"Onboarding failed: {e} - Response: {response.text}")
            raise
JorgeEcheva26's avatar
JorgeEcheva26 committed
    def _create_or_update_file(self, file_name, file_type, content, mode="w"):
        """
        Create or update a file with the specified content.

        :param file_name: Name of the file (without extension).
        :param file_type: File type or extension (e.g., "txt", "json", "html").
        :param content: Content to write into the file. Can be a string, dictionary, or list.
        :param mode: Write mode ('w' to overwrite, 'a' to append). Default is 'w'.
        """
        # Validate the mode
        if mode not in ["w", "a"]:
            raise ValueError("Mode must be 'w' (overwrite) or 'a' (append).")

        # Construct the full file name
        full_file_name = f"{file_name}.{file_type}"
        full_path = os.path.join(self.provider_folder, full_file_name)

        # Ensure the content is properly formatted
        if isinstance(content, (dict, list)):
            if file_type == "json":
                try:
                    # Serialize content to JSON
                    content = json.dumps(content, indent=4)
                except TypeError as e:
                    raise ValueError(f"Failed to serialize content to JSON: {e}")
            else:
                raise TypeError("Content must be a string when the file type is not JSON.")
        elif not isinstance(content, str):
            raise TypeError("Content must be a string, dictionary, or list.")

        try:
            # Open the file in the specified mode
            with open(full_path, mode, encoding="utf-8") as file:
                file.write(content)
JorgeEcheva26's avatar
JorgeEcheva26 committed
            # Log success based on the mode
            if mode == "w":
                self.logger.info(f"File '{full_file_name}' created or overwritten successfully.")
            elif mode == "a":
                self.logger.info(f"Content appended to file '{full_file_name}' successfully.")
        except Exception as e:
            self.logger.error(f"Error handling the file '{full_file_name}': {e}")
            raise
JorgeEcheva26's avatar
JorgeEcheva26 committed
    def _find_key_by_value(self, data, target_value):
        """
        Given a dictionary and a value, return the key corresponding to that value.

        :param data: Dictionary to search.
        :param target_value: Value to find the corresponding key for.
        :return: Key corresponding to the target value, or None if not found.
        """
        for key, value in data.items():
            if value == target_value:
                return key
        return None
JorgeEcheva26's avatar
JorgeEcheva26 committed
    def _load_config_file(self, config_file: str):
        """Loads the configuration file."""
        try:
            with open(config_file, 'r') as file:
                return json.load(file)
        except FileNotFoundError:
            self.logger.warning(
                f"Configuration file {config_file} not found. Using defaults or environment variables.")
JorgeEcheva26's avatar
JorgeEcheva26 committed
            return {}
    
    def check_invoker_authentication(self, invoker_id, aef_id, api_id, supportedfeatures_rx, cert_rx):
        service_security = self._get_trusted_invokers(invoker_id, aef_id)
        
        # Check if _get_trusted_invokers returned an error and propagate it
        if isinstance(service_security, dict) and "status" in service_security:
            return service_security  

        result = self._check_service_security(service_security, aef_id, api_id, supportedfeatures_rx, cert_rx)

        return result

    def _get_trusted_invokers(self, invoker_id, aef_id):
        if not aef_id:
            return self._problem_details(
                status=500,
                title="Internal Server Error",
                detail="AEF ID is missing.",
                instance=f"/capif-security/v1/trustedInvokers/{invoker_id}",
                cause="MissingAEFId"
            )
        if not invoker_id:
            return self._problem_details(
                status=500,
                title="Internal Server Error",
                detail="Invoker ID is missing.",
                instance=f"/capif-security/v1/trustedInvokers/{invoker_id}",
                cause="MissingAEFId"
            )

        url = f"{self.capif_https_url}/capif-security/v1/trustedInvokers/{invoker_id}?authenticationInfo={True}&authorizationInfo={True}"
        
        provider_details = self._load_provider_api_details()
        key = self._find_key_by_value(data=provider_details, target_value=aef_id)

        if not key:
            return self._problem_details(
                status=500,
                title="Internal Server Error",
                detail=f"No key found for AEF ID: {aef_id}.",
                instance=url,
                cause="KeyNotFound"
            )

        keylow = key.lower()
        cert = (
            os.path.join(self.provider_folder, f"{keylow}.crt"),
            os.path.join(self.provider_folder, f"{key}_private_key.key")
        )

        try:
            response = requests.get(
                url,
                headers={"Content-Type": "application/json"},
                cert=cert,
                verify=os.path.join(self.provider_folder, "ca.crt")
            )
            response.raise_for_status()
            return response.json()  

        except requests.RequestException as e:
            return self._problem_details(
                status=502,
                title="Bad Gateway",
                detail=f"Failed to retrieve trusted invokers: {str(e)}",
                instance=url,
                cause="RequestFailure"
            )

    def _check_service_security(self, service_security, aef_id, api_id, supportedfeatures_rx, cert_rx):
        service_security_selected = self._find_security_info(service_security, aef_id, api_id)

        # If _find_security_info returns an error, propagate it
        if isinstance(service_security_selected, dict) and "status" in service_security_selected:
            return service_security_selected
        
        if service_security_selected["selSecurityMethod"] == "PKI":
            # TOBEDONE
            print("To be done")

        return {"status": 200, "message": {"supportedFeatures": supportedfeatures_rx}}

    def _find_security_info(self, service_security, aef_id, api_id):
        if not service_security or "securityInfo" not in service_security:
            return self._problem_details(
                status=500,
                title="Internal Server Error",
                detail="Service security information is missing or malformed.",
                instance="/capif-security/v1/securityInfo",
                cause="MalformedServiceSecurity"
            )

        for entry in service_security["securityInfo"]:
            if entry["aefId"] == aef_id and entry["apiId"] == api_id:
                return entry

        return self._problem_details(
            status=404,
            title="Security Information Not Found",
            detail=f"No security information found for AEF ID: {aef_id}, API ID: {api_id}.",
            instance=f"/capif-security/v1/securityInfo/{aef_id}/{api_id}",
            cause="SecurityInfoNotFound"
        )

    def _problem_details(self, status, title, detail, instance, cause, invalidParams=None, supportedFeatures=None):
        """Generates the error message structure according to the ProblemDetails standard"""
        problem = {
            "type": "https://example.com/probs/security-error",
            "title": title,
            "status": status,
            "detail": detail,
            "instance": instance,
            "cause": cause
        }
        if invalidParams:
            problem["invalidParams"] = invalidParams
        if supportedFeatures:
            problem["supportedFeatures"] = supportedFeatures
        return problem