Skip to content
Snippets Groups Projects
capif_provider_connector.py 60.7 KiB
Newer Older
JorgeEcheva26's avatar
JorgeEcheva26 committed
from requests.exceptions import RequestsDependencyWarning
import warnings
import json
import requests
from OpenSSL.crypto import (
    dump_certificate_request,
    dump_privatekey,
    PKey,
    TYPE_RSA,
    X509Req
)
from OpenSSL.SSL import FILETYPE_PEM
import os
import logging
import shutil
from requests.auth import HTTPBasicAuth
import urllib3
JorgeEcheva26's avatar
JorgeEcheva26 committed
import ssl
import socket
JorgeEcheva26's avatar
JorgeEcheva26 committed
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


warnings.filterwarnings("ignore", category=RequestsDependencyWarning)

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
# Basic logger configuration
JorgeEcheva26's avatar
JorgeEcheva26 committed

log_path = 'logs/sdk_logs.log'

log_dir = os.path.dirname(log_path)

if not os.path.exists(log_dir):
    os.makedirs(log_dir)

logging.basicConfig(
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
    level=logging.NOTSET,  # Minimum severity level to log
    # Log message format
JorgeEcheva26's avatar
JorgeEcheva26 committed
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        logging.FileHandler(log_path),  # Logs to a file
        logging.StreamHandler()  # Also outputs to the console
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
class capif_provider_connector:
JorgeEcheva26's avatar
JorgeEcheva26 committed
    """
    Τhis class is responsible for onboarding an exposer (eg. NEF emulator) to CAPIF
    """

    def __init__(self, config_file: str):
        """
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        Initializes the CAPIFProvider connector with the parameters specified in the configuration file.
JorgeEcheva26's avatar
JorgeEcheva26 committed
        """
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Load configuration from file if necessary
JorgeEcheva26's avatar
JorgeEcheva26 committed
        config_file = os.path.abspath(config_file)
        self.config_path = os.path.dirname(config_file)+"/"
        config = self.__load_config_file(config_file)
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        debug_mode = os.getenv('DEBUG_MODE', config.get('debug_mode', 'False')).strip().lower()
JorgeEcheva26's avatar
JorgeEcheva26 committed
        if debug_mode == "false":
            debug_mode = False
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        else:
            debug_mode = True

        # Initialize logger for this class
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.logger = logging.getLogger(self.__class__.__name__)
        if debug_mode:
            self.logger.setLevel(logging.DEBUG)
        else:
            self.logger.setLevel(logging.WARNING)

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Set logging level for urllib based on debug_mode
JorgeEcheva26's avatar
JorgeEcheva26 committed
        urllib_logger = logging.getLogger("urllib3")
        if not debug_mode:
            urllib_logger.setLevel(logging.WARNING)
        else:
            urllib_logger.setLevel(logging.DEBUG)

        try:
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Retrieve provider configuration from JSON or environment variables
            provider_config = config.get('provider', {})
JorgeEcheva26's avatar
JorgeEcheva26 committed
            provider_general_folder = os.path.abspath(
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                os.getenv('PROVIDER_FOLDER', provider_config.get('provider_folder', '')).strip())

            capif_host = os.getenv('CAPIF_HOST', config.get('capif_host', '')).strip()
            capif_register_host = os.getenv('REGISTER_HOST', config.get('register_host', '')).strip()
            capif_https_port = str(os.getenv('CAPIF_HTTPS_PORT', config.get('capif_https_port', '')).strip())
            capif_register_port = str(os.getenv('CAPIF_REGISTER_PORT', config.get('capif_register_port', '')).strip())
            capif_provider_username = os.getenv('CAPIF_USERNAME', config.get('capif_username', '')).strip()
            capif_provider_password = os.getenv('CAPIF_PASSWORD', config.get('capif_password', '')).strip()

            # Get CSR (Certificate Signing Request) details from config or environment variables
            cert_generation = provider_config.get('cert_generation', {})
            csr_common_name = os.getenv('PROVIDER_CSR_COMMON_NAME', cert_generation.get('csr_common_name', '')).strip()
            csr_organizational_unit = os.getenv('PROVIDER_CSR_ORGANIZATIONAL_UNIT', cert_generation.get('csr_organizational_unit', '')).strip()
            csr_organization = os.getenv('PROVIDER_CSR_ORGANIZATION', cert_generation.get('csr_organization', '')).strip()
JorgeEcheva26's avatar
JorgeEcheva26 committed
            csr_locality = os.getenv('PROVIDER_CSR_LOCALITY', cert_generation.get('csr_locality', '')).strip()
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            csr_state_or_province_name = os.getenv('PROVIDER_CSR_STATE_OR_PROVINCE_NAME', cert_generation.get('csr_state_or_province_name', '')).strip()
            csr_country_name = os.getenv('PROVIDER_CSR_COUNTRY_NAME', cert_generation.get('csr_country_name', '')).strip()
            csr_email_address = os.getenv('PROVIDER_CSR_EMAIL_ADDRESS', cert_generation.get('csr_email_address', '')).strip()

            # Retrieve provider specific values (APFs, AEFs)
            supported_features = os.getenv('PROVIDER_SUPPORTED_FEATURES', provider_config.get('supported_features', '')).strip()
            if not supported_features:
                supported_features = "0"
            apfs = os.getenv('PROVIDER_APFS', provider_config.get('apfs', '')).strip()
            aefs = os.getenv('PROVIDER_AEFS', provider_config.get('aefs', '')).strip()
            api_description_path = os.path.abspath(os.getenv('PROVIDER_API_DESCRIPTION_PATH', provider_config.get('api_description_path', '')).strip())
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed

            # Check required fields and log warnings/errors
JorgeEcheva26's avatar
JorgeEcheva26 committed
            if not capif_host:
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                self.logger.warning("CAPIF_HOST is not provided; defaulting to an empty string")
JorgeEcheva26's avatar
JorgeEcheva26 committed
            if not capif_provider_username:
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                self.logger.error("CAPIF_PROVIDER_USERNAME is required but not provided")
JorgeEcheva26's avatar
JorgeEcheva26 committed
                raise ValueError("CAPIF_PROVIDER_USERNAME is required")

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Setup the folder to store provider files (e.g., certificates)
            self.provider_folder = os.path.join(provider_general_folder, capif_provider_username)
JorgeEcheva26's avatar
JorgeEcheva26 committed
            os.makedirs(self.provider_folder, exist_ok=True)

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Set attributes for provider credentials and configuration
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.capif_host = capif_host.strip()
            self.capif_provider_username = capif_provider_username
            self.capif_provider_password = capif_provider_password
            self.capif_register_host = capif_register_host
            self.capif_register_port = capif_register_port
            self.csr_common_name = csr_common_name
            self.csr_organizational_unit = csr_organizational_unit
            self.csr_organization = csr_organization
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.csr_locality = csr_locality
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.csr_state_or_province_name = csr_state_or_province_name
            self.csr_country_name = csr_country_name
            self.csr_email_address = csr_email_address
            self.supported_features = supported_features
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.aefs = int(aefs)
            self.apfs = int(apfs)
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed

            # Get publish request details from config or environment variables
            publish_req_config = provider_config.get('publish_req', {})
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.publish_req = {
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                "service_api_id": os.getenv('PUBLISH_REQ_SERVICE_API_ID', publish_req_config.get('service_api_id', '')).strip(),
                "publisher_apf_id": os.getenv('PUBLISH_REQ_PUBLISHER_APF_ID', publish_req_config.get('publisher_apf_id', '')).strip(),
                "publisher_aefs_ids": os.getenv('PUBLISH_REQ_PUBLISHER_AEFS_IDS', publish_req_config.get('publisher_aefs_ids', ''))
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Set the path for the API description file
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.api_description_path = api_description_path

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Set the CAPIF HTTPS port and construct CAPIF URLs
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.capif_https_port = str(capif_https_port)

            self.provider_capif_ids = {}
JorgeEcheva26's avatar
JorgeEcheva26 committed
            path_prov_funcs = os.path.join(self.provider_folder, "provider_capif_ids.json")
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            if os.path.exists(path_prov_funcs):
JorgeEcheva26's avatar
JorgeEcheva26 committed
                self.provider_capif_ids = self._load_provider_api_details()
JorgeEcheva26's avatar
JorgeEcheva26 committed
            path_published = os.path.join(self.provider_folder, "provider_service_ids.json")
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            if os.path.exists(path_published):
JorgeEcheva26's avatar
JorgeEcheva26 committed
                self.provider_service_ids = self.__load_config_file(path_published)
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed

            # Construct the CAPIF HTTPS URL
JorgeEcheva26's avatar
JorgeEcheva26 committed
            if len(self.capif_https_port) == 0 or int(self.capif_https_port) == 443:
                self.capif_https_url = f"https://{capif_host.strip()}/"
            else:
                self.capif_https_url = f"https://{capif_host.strip()}:{self.capif_https_port.strip()}/"

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Construct the CAPIF register URL
JorgeEcheva26's avatar
JorgeEcheva26 committed
            if len(capif_register_port) == 0:
                self.capif_register_url = f"https://{capif_register_host.strip()}:8084/"
            else:
                self.capif_register_url = f"https://{capif_register_host.strip()}:{capif_register_port.strip()}/"
JorgeEcheva26's avatar
JorgeEcheva26 committed
            events_config = provider_config.get('events', {})
            self.events_description = os.getenv('PROVIDER_EVENTS_DESCRIPTION', events_config.get('description', ''))
            self.events_filter = os.getenv('PROVIDER_EVENTS_FILTERS', events_config.get('eventFilters', ''))
            self.notification_destination = os.getenv('PROVIDER_EVENTS_FILTERS', events_config.get('notificationDestination', ''))
            self.websock_notif_config = os.getenv('PROVIDER_EVENTS_FILTERS', events_config.get('websockNotifConfig', ''))
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Log initialization success message
            self.logger.info("capif_provider_connector initialized with the capif_sdk_config.json parameters")
JorgeEcheva26's avatar
JorgeEcheva26 committed

        except Exception as e:
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Catch and log any exceptions that occur during initialization
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.logger.error(f"Error during initialization: {e}")
            raise

    def __store_certificate(self) -> None:
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.logger.info("Retrieving capif_cert_server.pem...")
JorgeEcheva26's avatar
JorgeEcheva26 committed

        try:
JorgeEcheva26's avatar
JorgeEcheva26 committed
            # Crear un contexto SSL que no valide certificados
            context = ssl.create_default_context()
            context.check_hostname = False
            context.verify_mode = ssl.CERT_NONE
            with socket.create_connection((self.capif_host, self.capif_https_port)) as sock:
                with context.wrap_socket(sock, server_hostname=self.capif_host) as ssock:
                    cert = ssock.getpeercert(binary_form=True)
                    cert_file = os.path.join(self.provider_folder, "capif_cert_server.pem")
                    with open(cert_file, "wb") as f:
                        f.write(ssl.DER_cert_to_PEM_cert(cert).encode())
            self.logger.info("cert_server.pem successfully generated!")
JorgeEcheva26's avatar
JorgeEcheva26 committed
        except Exception as e:
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.logger.error(f"Error occurred while retrieving certificate: {e}")
JorgeEcheva26's avatar
JorgeEcheva26 committed
            raise

    def __load_config_file(self, config_file: str):
        """Carga el archivo de configuración."""
        try:
            with open(config_file, 'r') as file:
                return json.load(file)
        except FileNotFoundError:
            self.logger.warning(
                f"Configuration file {config_file} not found. Using defaults or environment variables.")
            return {}

    def __create_private_and_public_keys(self, api_prov_func_role) -> bytes:
        """
        Creates private and public keys in the certificates folder.
        :return: The contents of the public key
        """
        private_key_path = os.path.join(
            self.provider_folder, f"{api_prov_func_role}_private_key.key")
        csr_file_path = os.path.join(
            self.provider_folder, f"{api_prov_func_role}_public.csr")

        # Create key pair
        key = PKey()
        key.generate_key(TYPE_RSA, 2048)

        # Create CSR
        req = X509Req()
        subject = req.get_subject()
        subject.CN = api_prov_func_role.lower()
        subject.O = self.csr_organization
        subject.OU = self.csr_organizational_unit
JorgeEcheva26's avatar
JorgeEcheva26 committed
        subject.L = self.csr_locality
JorgeEcheva26's avatar
JorgeEcheva26 committed
        subject.ST = self.csr_state_or_province_name
        subject.C = self.csr_country_name
        subject.emailAddress = self.csr_email_address

        req.set_pubkey(key)
        req.sign(key, "sha256")

        # Write CSR and private key to files
        with open(csr_file_path, "wb") as csr_file:
            public_key = dump_certificate_request(FILETYPE_PEM, req)
            csr_file.write(public_key)

        with open(private_key_path, "wb") as private_key_file:
            private_key_file.write(dump_privatekey(FILETYPE_PEM, key))

        return public_key

    def __onboard_exposer_to_capif(self, access_token, capif_onboarding_url, supp_features):
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.logger.info(
            "Onboarding Provider to CAPIF and waiting signed certificate by giving our public keys to CAPIF")
JorgeEcheva26's avatar
JorgeEcheva26 committed
        url = f"{self.capif_https_url}{capif_onboarding_url}"
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
        }

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Create the list of roles without indexing
JorgeEcheva26's avatar
JorgeEcheva26 committed
        roles = ["AMF"]
        for n in range(1, self.aefs + 1):
            roles.append("AEF")

        for n in range(1, self.apfs + 1):
            roles.append("APF")

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Build the payload with the non-indexed roles
JorgeEcheva26's avatar
JorgeEcheva26 committed
        payload = {
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            "apiProvFuncs": [
JorgeEcheva26's avatar
JorgeEcheva26 committed
                {"regInfo": {"apiProvPubKey": ""}, "apiProvFuncRole": role,
                    "apiProvFuncInfo": f"{role.lower()}"}
                for role in roles
            ],
            "apiProvDomInfo": "This is provider",
            "suppFeat": supp_features,
JorgeEcheva26's avatar
JorgeEcheva26 committed
            "failReason": "string",
            "regSec": access_token,
        }

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Generate the indexed roles for certificate creation
JorgeEcheva26's avatar
JorgeEcheva26 committed
        indexedroles = ["AMF"]
        for n in range(1, self.aefs + 1):
            indexedroles.append(f"AEF-{n}")

        for n in range(1, self.apfs + 1):
            indexedroles.append(f"APF-{n}")

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Save the public keys and generate certificates with indexed roles
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        for i, api_func in enumerate(payload["apiProvFuncs"]):
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Generate public keys with the indexed role, but do not update the payload with the indexed role
JorgeEcheva26's avatar
JorgeEcheva26 committed
            public_key = self.__create_private_and_public_keys(indexedroles[i])

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Assign the public key to the payload
JorgeEcheva26's avatar
JorgeEcheva26 committed
            api_func["regInfo"]["apiProvPubKey"] = public_key.decode("utf-8")
        try:
            response = requests.post(
                url,
                headers=headers,
                data=json.dumps(payload),
                verify=os.path.join(self.provider_folder, "ca.crt"),
            )
            response.raise_for_status()
            self.logger.info(
                "Provider onboarded and signed certificate obtained successfully")
            return response.json()
        except requests.exceptions.RequestException as e:
            self.logger.error(
                f"Onboarding failed: {e} - Response: {response.text}")
            raise

    def __write_to_file(self, onboarding_response, capif_registration_id, publish_url):
        self.logger.info("Saving the most relevant onboarding data")

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Generate the indexed roles for correspondence
JorgeEcheva26's avatar
JorgeEcheva26 committed
        indexedroles = ["AMF"]
        for n in range(1, self.aefs + 1):
            indexedroles.append(f"AEF-{n}")

        for n in range(1, self.apfs + 1):
            indexedroles.append(f"APF-{n}")

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Save the certificates with the indexed names
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        for i, func_profile in enumerate(onboarding_response["apiProvFuncs"]):
JorgeEcheva26's avatar
JorgeEcheva26 committed
            role = indexedroles[i].lower()
            cert_path = os.path.join(self.provider_folder, f"{role}.crt")
            with open(cert_path, "wb") as cert_file:
                cert_file.write(
                    func_profile["regInfo"]["apiProvCert"].encode("utf-8"))

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Save the provider details
JorgeEcheva26's avatar
JorgeEcheva26 committed
        provider_details_path = os.path.join(
            self.provider_folder, "provider_capif_ids.json")
JorgeEcheva26's avatar
JorgeEcheva26 committed
        with open(provider_details_path, "w") as outfile:
            data = {
                "capif_registration_id": capif_registration_id,
                "publish_url": publish_url,
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                **{f"{indexedroles[i]}": api_prov_func["apiProvFuncId"]
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                   for i, api_prov_func in enumerate(onboarding_response["apiProvFuncs"])}
JorgeEcheva26's avatar
JorgeEcheva26 committed
            }
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            for i, api_prov_func in enumerate(onboarding_response["apiProvFuncs"]):
                self.provider_capif_ids[indexedroles[i]] = api_prov_func["apiProvFuncId"]

JorgeEcheva26's avatar
JorgeEcheva26 committed
            json.dump(data, outfile, indent=4)
        self.logger.info("Data saved")

    def __save_capif_ca_root_file_and_get_auth_token(self):
        url = f"{self.capif_register_url}getauth"
        self.logger.info(
            "Saving CAPIF CA root file and getting auth token with user and password given by the CAPIF administrator")

        try:
            response = requests.get(
                url,
                headers={"Content-Type": "application/json"},
                auth=HTTPBasicAuth(self.capif_provider_username,
                                   self.capif_provider_password),
                verify=False
            )
            response.raise_for_status()

            self.logger.info("Authorization acquired successfully")

            response_payload = response.json()
            ca_root_file_path = os.path.join(self.provider_folder, "ca.crt")

            with open(ca_root_file_path, "wb") as ca_root_file:
                ca_root_file.write(response_payload["ca_root"].encode("utf-8"))

            self.logger.info(
                "CAPIF CA root file saved and auth token obtained successfully")
            return response_payload

        except requests.exceptions.RequestException as e:
            self.logger.error(
                f"Error acquiring authorization: {e} - Response: {response.text}")
            raise

    def onboard_provider(self, supp_features="0") -> None:
JorgeEcheva26's avatar
JorgeEcheva26 committed
        """
        Retrieves and stores the certificate from CAPIF, acquires authorization, and registers the provider.
        """
        # Store the certificate
        self.__store_certificate()

        # Retrieve CA root file and get authorization token
        capif_postauth_info = self.__save_capif_ca_root_file_and_get_auth_token()

        # Extract necessary information
        capif_onboarding_url = capif_postauth_info["ccf_api_onboarding_url"]
        access_token = capif_postauth_info["access_token"]
        ccf_publish_url = capif_postauth_info["ccf_publish_url"]

        # Onboard provider to CAPIF
        onboarding_response = self.__onboard_exposer_to_capif(
            access_token, capif_onboarding_url, supp_features
JorgeEcheva26's avatar
JorgeEcheva26 committed
        )

        # Save onboarding details to file
        capif_registration_id = onboarding_response["apiProvDomId"]
        self.__write_to_file(
            onboarding_response, capif_registration_id, ccf_publish_url
        )

    def publish_services(self) -> dict:
        """
        Publishes services to CAPIF and returns the published services dictionary.

        :param service_api_description_json_full_path: The full path of the service_api_description.json containing
        the endpoints to be published.
        :return: The published services dictionary that was saved in CAPIF.
        """
        self.logger.info("Starting the service publication process")

        # Load provider details
        provider_details_path = os.path.join(
            self.provider_folder, "provider_capif_ids.json")
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.logger.info(
            f"Loading provider details from {provider_details_path}")

JorgeEcheva26's avatar
JorgeEcheva26 committed
        provider_details = self._load_provider_api_details()
JorgeEcheva26's avatar
JorgeEcheva26 committed

        publish_url = provider_details["publish_url"]

        chosenAPFsandAEFs = self.publish_req

        APF_api_prov_func_id = chosenAPFsandAEFs["publisher_apf_id"]
        AEFs_list = chosenAPFsandAEFs["publisher_aefs_ids"]

        apf_number = None
        for key, value in provider_details.items():
            if value == APF_api_prov_func_id and key.startswith("APF-"):
                apf_inter = key.split("-")[1]
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                # Obtain the APF number
JorgeEcheva26's avatar
JorgeEcheva26 committed
                apf_number = apf_inter.split("_")[0]
                break

        if apf_number is None:
            self.logger.error(
                f"No matching APF found for publisher_apf_id: {APF_api_prov_func_id}")
            raise ValueError("Invalid publisher_apf_id")
        service_api_description_json_full_path = self.api_description_path
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Read and modify the API description
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.logger.info(
            f"Reading and modifying service API description from {service_api_description_json_full_path}")

        try:
            with open(service_api_description_json_full_path, "r") as service_file:
                data = json.load(service_file)

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                # Verifying that the number of AEFs is equal to the aefProfiles
JorgeEcheva26's avatar
JorgeEcheva26 committed
                if len(AEFs_list) != len(data.get("aefProfiles", [])):
                    self.logger.error(
                        "The number of AEFs in publisher_aefs_ids does not match the number of profiles in aefProfiles")
                    raise ValueError(
                        "Mismatch between number of AEFs and profiles")

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                # Assigning each AEF
JorgeEcheva26's avatar
JorgeEcheva26 committed
                for profile, aef_id in zip(data.get("aefProfiles", []), AEFs_list):
                    if not isinstance(profile, dict):  # Verificar que profile sea un diccionario
                        raise TypeError(f"Expected profile to be a dict, got {type(profile).__name__}")

                    profile["aefId"] = aef_id  # Asignar el ID de AEF

                    versions = profile.get("versions")  # Obtener versions

                    i = 1
                    for version in versions:  # Iterar sobre cada versión
                        if not isinstance(version, dict):  # Verificar que cada versión sea un diccionario
                            raise TypeError(f"Expected each version to be a dict, got {type(version).__name__}")

                        # Obtener nombres existentes de operaciones personalizadas
                        existing_operations = {
                            op["custOpName"].strip()
                            for op in version.get("custOperations", []) if isinstance(op, dict)
                        }

                        # Verificar y agregar `check-authentication` si no existe
                        if "check-authentication" not in existing_operations and i == 1:
                            version.setdefault("custOperations", []).append({
                                "commType": "REQUEST_RESPONSE",
                                "custOpName": "check-authentication",
                                "description": "Check authentication request."
                            })

                        # Verificar y agregar `revoke-authentication` si no existe
                        if "revoke-authentication" not in existing_operations and i == 1:
                            version.setdefault("custOperations", []).append({
                                "commType": "REQUEST_RESPONSE",
                                "custOpName": "revoke-authentication",
                                "description": "Revoke authorization for service APIs."
                            })
JorgeEcheva26's avatar
JorgeEcheva26 committed
                self.logger.info(
                    "Service API description modified successfully")

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                # Saving changes into the file
JorgeEcheva26's avatar
JorgeEcheva26 committed
                with open(service_api_description_json_full_path, "w") as service_file:
                    json.dump(data, service_file, indent=4)

        except FileNotFoundError:
            self.logger.error(
                f"Service API description file not found: {service_api_description_json_full_path}")
            raise
        except json.JSONDecodeError as e:
            self.logger.error(
                f"Error decoding JSON from file {service_api_description_json_full_path}: {e}")
            raise
        except ValueError as e:
            self.logger.error(f"Error with the input data: {e}")
            raise

        # Publish services
        url = f"{self.capif_https_url}{publish_url.replace('<apfId>', APF_api_prov_func_id)}"
        cert = (
            os.path.join(self.provider_folder, f"apf-{apf_number}.crt"),
            os.path.join(self.provider_folder,
                         f"APF-{apf_number}_private_key.key"),
JorgeEcheva26's avatar
JorgeEcheva26 committed
        )

        self.logger.info(f"Publishing services to URL: {url}")
        try:
            response = requests.post(
                url,
                headers={"Content-Type": "application/json"},
                data=json.dumps(data),
                cert=cert,
                verify=os.path.join(self.provider_folder, "ca.crt"),
            )
            response.raise_for_status()
            self.logger.info("Services published successfully")

            # Save response to file
            capif_response_text = response.text

            capif_response_json = json.loads(capif_response_text)

            # Default name if apiName is missing
            file_name = capif_response_json.get("apiName", "default_name")
            id = capif_response_json.get("apiId", "default_id")
            output_path = os.path.join(
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                self.provider_folder, f"capif_{file_name}_{id}_api.json")
JorgeEcheva26's avatar
JorgeEcheva26 committed

            with open(output_path, "w") as outfile:
                outfile.write(capif_response_text)
            self.logger.info(f"CAPIF response saved to {output_path}")
            output_path = os.path.join(

                self.provider_folder, "provider_service_ids.json")
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Read the existing file of published APIs
            provider_service_ids = {}
JorgeEcheva26's avatar
JorgeEcheva26 committed
            if os.path.exists(output_path):
                with open(output_path, "r") as outfile:
                    provider_service_ids = json.load(outfile)
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Add the newly published API

            provider_service_ids[file_name] = id

            self.provider_service_ids = provider_service_ids
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Write the updated file of published APIs
JorgeEcheva26's avatar
JorgeEcheva26 committed
            with open(output_path, "w") as outfile:
                json.dump(provider_service_ids, outfile, indent=4)
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.logger.info(
                f"API '{file_name}' with ID '{id}' added to Published Apis.")
            return json.loads(capif_response_text)

        except requests.RequestException as e:
            self.logger.error(
                f"Request to CAPIF failed: {e} - Response: {response.text}")
            raise
        except Exception as e:
            self.logger.error(
                f"Unexpected error during service publication: {e} - Response: {response.text}")
            raise

    def unpublish_service(self) -> dict:
        """
        Publishes services to CAPIF and returns the published services dictionary.

        :param service_api_description_json_full_path: The full path of the service_api_description.json containing
        the endpoints to be published.
        :return: The published services dictionary that was saved in CAPIF.
        """
        self.logger.info("Starting the service unpublication process")
        provider_details_path = os.path.join(
            self.provider_folder, "provider_capif_ids.json")
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.logger.info(
            f"Loading provider details from {provider_details_path}")

JorgeEcheva26's avatar
JorgeEcheva26 committed
        provider_details = self._load_provider_api_details()
JorgeEcheva26's avatar
JorgeEcheva26 committed
        publish_url = provider_details["publish_url"]

        # Load provider details
        publish = self.publish_req
        api_id = "/" + publish["service_api_id"]
        APF_api_prov_func_id = publish["publisher_apf_id"]
        apf_number = None
        for key, value in provider_details.items():
            if value == APF_api_prov_func_id and key.startswith("APF-"):
                apf_inter = key.split("-")[1]
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                # Get the number of APFs
JorgeEcheva26's avatar
JorgeEcheva26 committed
                apf_number = apf_inter.split("_")[0]
                break

        if apf_number is None:
            self.logger.error(
                f"No matching APF found for publisher_apf_id: {APF_api_prov_func_id}")
            raise ValueError("Invalid publisher_apf_id")

        self.logger.info(
            f"Loading provider details from {provider_details_path}")

        url = f"{self.capif_https_url}{publish_url.replace('<apfId>', APF_api_prov_func_id)}{api_id}"

        cert = (
            os.path.join(self.provider_folder, f"apf-{apf_number}.crt"),
            os.path.join(self.provider_folder, f"APF-{apf_number}_private_key.key"),
JorgeEcheva26's avatar
JorgeEcheva26 committed
        )

        self.logger.info(f"Unpublishing service to URL: {url}")

        try:
            response = requests.delete(
                url,
                headers={"Content-Type": "application/json"},
                cert=cert,
                verify=os.path.join(self.provider_folder, "ca.crt"),
            )

            response.raise_for_status()

            directory = self.provider_folder

            # Iterar sobre todos los archivos en el directorio
            for filename in os.listdir(directory):
                path = os.path.join(directory, filename)

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                # Check if the file starts with 'CAPIF-'
JorgeEcheva26's avatar
JorgeEcheva26 committed

                if filename.startswith("CAPIF-") and publish["service_api_id"] in filename:

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                    # Exit the loop if the file is deleted
JorgeEcheva26's avatar
JorgeEcheva26 committed
                    os.remove(path)
                    break

            output_path = os.path.join(
                self.provider_folder, "provider_service_ids.json")
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Read the existing file of published APIs
            provider_service_ids = {}
JorgeEcheva26's avatar
JorgeEcheva26 committed
            if os.path.exists(output_path):
                with open(output_path, "r") as outfile:
                    provider_service_ids = json.load(outfile)
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # API ID you want to delete
            # Replace with the specific ID
JorgeEcheva26's avatar
JorgeEcheva26 committed
            api_id_to_delete = publish["service_api_id"]

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Search and delete the API by its ID
JorgeEcheva26's avatar
JorgeEcheva26 committed
            api_name_to_delete = None
            for name, id in provider_service_ids.items():
JorgeEcheva26's avatar
JorgeEcheva26 committed
                if id == api_id_to_delete:
                    api_name_to_delete = name
                    break

            if api_name_to_delete:
                del provider_service_ids[api_name_to_delete]
JorgeEcheva26's avatar
JorgeEcheva26 committed
                self.logger.info(
                    f"API with ID '{api_id_to_delete}' removed from Published Apis.")
            else:
                self.logger.warning(
                    f"API with ID '{api_id_to_delete}' not found in Published Apis.")

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Write the updated file of published APIs
JorgeEcheva26's avatar
JorgeEcheva26 committed
            with open(output_path, "w") as outfile:

                json.dump(provider_service_ids, outfile, indent=4)
            self.provider_service_ids = provider_service_ids
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.logger.info("Services unpublished successfully")

        except requests.RequestException as e:
            self.logger.error(
                f"Request to CAPIF failed: {e} - Response: {response.text}")
            raise
        except Exception as e:
            self.logger.error(
                f"Unexpected error during service unpublication: {e} - Response: {response.text}")
            raise

    def get_service(self) -> dict:
        """
        Publishes services to CAPIF and returns the published services dictionary.

        :param service_api_description_json_full_path: The full path of the service_api_description.json containing
        the endpoints to be published.
        :return: The published services dictionary that was saved in CAPIF.
        """
        self.logger.info("Starting the service unpublication process")

        provider_details_path = os.path.join(
            self.provider_folder, "provider_capif_ids.json")
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.logger.info(
            f"Loading provider details from {provider_details_path}")

JorgeEcheva26's avatar
JorgeEcheva26 committed
        provider_details = self._load_provider_api_details()
JorgeEcheva26's avatar
JorgeEcheva26 committed
        publish_url = provider_details["publish_url"]

        chosenAPFsandAEFs = self.publish_req

        APF_api_prov_func_id = chosenAPFsandAEFs["publisher_apf_id"]

        api_id = "/" + chosenAPFsandAEFs["service_api_id"]

        apf_number = None
        for key, value in provider_details.items():
            if value == APF_api_prov_func_id and key.startswith("APF-"):
                apf_inter = key.split("-")[1]
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                # Get the number of apfs
JorgeEcheva26's avatar
JorgeEcheva26 committed
                apf_number = apf_inter.split("_")[0]
                break

        if apf_number is None:
            self.logger.error(
                f"No matching APF found for publisher_apf_id: {APF_api_prov_func_id}")
            raise ValueError("Invalid publisher_apf_id")

        url = f"{self.capif_https_url}{publish_url.replace('<apfId>', APF_api_prov_func_id)}{api_id}"

        cert = (
            os.path.join(self.provider_folder, f"apf-{apf_number}.crt"),
            os.path.join(self.provider_folder,
JorgeEcheva26's avatar
JorgeEcheva26 committed
                         f"APF-{apf_number}_private_key.key"),
JorgeEcheva26's avatar
JorgeEcheva26 committed
        )

        self.logger.info(f"Getting service to URL: {url}")

        try:
            response = requests.get(
                url,
                headers={"Content-Type": "application/json"},
                cert=cert,
                verify=os.path.join(self.provider_folder, "ca.crt"),
            )

            response.raise_for_status()

            self.logger.info("Service received successfully")
            path = os.path.join(self.provider_folder, "service_received.json")
            with open(path, 'w') as f:
                json_data = json.loads(response.text)
                json.dump(json_data, f, indent=4)
            self.logger.info(f"Service saved in {path}")

        except requests.RequestException as e:
            self.logger.error(
                f"Request to CAPIF failed: {e} - Response: {response.text}")
            raise
        except Exception as e:
            self.logger.error(
                f"Unexpected error during service getter: {e} - Response: {response.text}")
            raise

    def get_all_services(self) -> dict:
        """
        Publishes services to CAPIF and returns the published services dictionary.

        :param service_api_description_json_full_path: The full path of the service_api_description.json containing
        the endpoints to be published.
        :return: The published services dictionary that was saved in CAPIF.
        """
        self.logger.info("Starting the service publication process")

        # Load provider details
JorgeEcheva26's avatar
JorgeEcheva26 committed
        provider_details_path = os.path.join(
            self.provider_folder, "provider_capif_ids.json")
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.logger.info(
            f"Loading provider details from {provider_details_path}")

JorgeEcheva26's avatar
JorgeEcheva26 committed
        provider_details = self._load_provider_api_details()
JorgeEcheva26's avatar
JorgeEcheva26 committed
        publish_url = provider_details["publish_url"]

        chosenAPFsandAEFs = self.publish_req

        APF_api_prov_func_id = chosenAPFsandAEFs["publisher_apf_id"]

        apf_number = None
        for key, value in provider_details.items():
            if value == APF_api_prov_func_id and key.startswith("APF-"):
                apf_inter = key.split("-")[1]
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                # Get the number of APFs
JorgeEcheva26's avatar
JorgeEcheva26 committed
                apf_number = apf_inter.split("_")[0]
                break

        if apf_number is None:
            self.logger.error(
                f"No matching APF found for publisher_apf_id: {APF_api_prov_func_id}")
            raise ValueError("Invalid publisher_apf_id")

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Read and modify the description of the API services
JorgeEcheva26's avatar
JorgeEcheva26 committed

        # Publish services
        url = f"{self.capif_https_url}{publish_url.replace('<apfId>', APF_api_prov_func_id)}"
        cert = (
            os.path.join(self.provider_folder, f"apf-{apf_number}.crt"),
            os.path.join(self.provider_folder,
JorgeEcheva26's avatar
JorgeEcheva26 committed
                         f"APF-{apf_number}_private_key.key"),
JorgeEcheva26's avatar
JorgeEcheva26 committed
        )

        self.logger.info(f"Getting services to URL: {url}")

        try:
            response = requests.get(
                url,
                headers={"Content-Type": "application/json"},
                cert=cert,
                verify=os.path.join(self.provider_folder, "ca.crt"),
            )
            response.raise_for_status()
            self.logger.info("Services received successfully")

            path = os.path.join(self.provider_folder, "service_received.json")
            with open(path, 'w') as f:
                json_data = json.loads(response.text)
                json.dump(json_data, f, indent=4)
            self.logger.info(f"Services saved in {path}")

            # Save response to file

        except requests.RequestException as e:
            self.logger.error(
                f"Request to CAPIF failed: {e} - Response: {response.text}")
            raise
        except Exception as e:
            self.logger.error(
                f"Unexpected error during services reception: {e} - Response: {response.text}")
            raise

    def update_service(self) -> dict:
        """
        Publishes services to CAPIF and returns the published services dictionary.

        :param service_api_description_json_full_path: The full path of the service_api_description.json containing
        the endpoints to be published.
        :return: The published services dictionary that was saved in CAPIF.
        """
        self.logger.info("Starting the service publication process")

        # Load provider details
        # Load provider details
        provider_details_path = os.path.join(
            self.provider_folder, "provider_capif_ids.json")
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.logger.info(
            f"Loading provider details from {provider_details_path}")

JorgeEcheva26's avatar
JorgeEcheva26 committed
        provider_details = self._load_provider_api_details()
JorgeEcheva26's avatar
JorgeEcheva26 committed
        publish_url = provider_details["publish_url"]

        chosenAPFsandAEFs = self.publish_req

        APF_api_prov_func_id = chosenAPFsandAEFs["publisher_apf_id"]
        AEFs_list = chosenAPFsandAEFs["publisher_aefs_ids"]

        apf_number = None
        for key, value in provider_details.items():
            if value == APF_api_prov_func_id and key.startswith("APF-"):
                apf_inter = key.split("-")[1]
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                # Get the number of APFs
JorgeEcheva26's avatar
JorgeEcheva26 committed
                apf_number = apf_inter.split("_")[0]
                break

        if apf_number is None:
            self.logger.error(
                f"No matching APF found for publisher_apf_id: {APF_api_prov_func_id}")
            raise ValueError("Invalid publisher_apf_id")

        service_api_description_json_full_path = self.api_description_path
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Read and modify the description of the API services
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.logger.info(
            f"Reading and modifying service API description from {service_api_description_json_full_path}")

        try:
            with open(service_api_description_json_full_path, "r") as service_file:
                data = json.load(service_file)

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                # verify the aefs number corresponds to the aefProfiles
JorgeEcheva26's avatar
JorgeEcheva26 committed
                if len(AEFs_list) != len(data.get("aefProfiles", [])):
                    self.logger.error(
                        "The number of AEFs in publisher_aefs_ids does not match the number of profiles in aefProfiles")
                    raise ValueError(
                        "Mismatch between number of AEFs and profiles")

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                # Asing the chosen AEFs
JorgeEcheva26's avatar
JorgeEcheva26 committed
                for profile, aef_id in zip(data.get("aefProfiles", []), AEFs_list):
                    if not isinstance(profile, dict):  # Verificar que profile sea un diccionario
                        raise TypeError(f"Expected profile to be a dict, got {type(profile).__name__}")

                    profile["aefId"] = aef_id  # Asignar el ID de AEF

                    versions = profile.get("versions")  # Obtener versions
                    i = 1
                    for version in versions:  # Iterar sobre cada versión
                        if not isinstance(version, dict):  # Verificar que cada versión sea un diccionario
                            raise TypeError(f"Expected each version to be a dict, got {type(version).__name__}")

                        # Obtener nombres existentes de operaciones personalizadas
                        existing_operations = {
                            op["custOpName"].strip()
                            for op in version.get("custOperations", []) if isinstance(op, dict)
                        }

                        # Verificar y agregar `check-authentication` si no existe
                        if "check-authentication" not in existing_operations and i == 1:
                            version.setdefault("custOperations", []).append({
                                "commType": "REQUEST_RESPONSE",
                                "custOpName": "check-authentication",
                                "description": "Check authentication request."
                            })

                        # Verificar y agregar `revoke-authentication` si no existe
                        if "revoke-authentication" not in existing_operations and i == 1:
                            version.setdefault("custOperations", []).append({
                                "commType": "REQUEST_RESPONSE",
                                "custOpName": "revoke-authentication",
                                "description": "Revoke authorization for service APIs."
                            })
JorgeEcheva26's avatar
JorgeEcheva26 committed
                self.logger.info(
                    "Service API description modified successfully")

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                # Save changes
JorgeEcheva26's avatar
JorgeEcheva26 committed
                with open(service_api_description_json_full_path, "w") as service_file:
                    json.dump(data, service_file, indent=4)

        except FileNotFoundError:
            self.logger.error(
                f"Service API description file not found: {service_api_description_json_full_path}")
            raise
        except json.JSONDecodeError as e:
            self.logger.error(
                f"Error decoding JSON from file {service_api_description_json_full_path}: {e}")
            raise
        except ValueError as e:
            self.logger.error(f"Error with the input data: {e}")
            raise
        api_id = "/" + chosenAPFsandAEFs["service_api_id"]
        # Publish services
        url = f"{self.capif_https_url}{publish_url.replace('<apfId>', APF_api_prov_func_id)}{api_id}"
        cert = (
            os.path.join(self.provider_folder, f"apf-{apf_number}.crt"),
            os.path.join(self.provider_folder,
JorgeEcheva26's avatar
JorgeEcheva26 committed
                         f"APF-{apf_number}_private_key.key"),
JorgeEcheva26's avatar
JorgeEcheva26 committed
        )

        self.logger.info(f"Publishing services to URL: {url}")

        try:
            response = requests.put(
                url,
                headers={"Content-Type": "application/json"},
                data=json.dumps(data),
                cert=cert,
                verify=os.path.join(self.provider_folder, "ca.crt"),
            )
            response.raise_for_status()
            self.logger.info("Services updated successfully")

            # Save response to file
            capif_response_text = response.text

            capif_response_json = json.loads(capif_response_text)

            # Default name if apiName is missing
            file_name = capif_response_json.get("apiName", "default_name")
            id = capif_response_json.get("apiId", "default_id")
            directory = self.provider_folder

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Iterate over all files in the directory
JorgeEcheva26's avatar
JorgeEcheva26 committed
            for filename in os.listdir(directory):
                path = os.path.join(directory, filename)

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                # Check if the file starts with 'CAPIF-'
JorgeEcheva26's avatar
JorgeEcheva26 committed

                if filename.startswith("CAPIF-") and id in filename:

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                    # Exit the loop if the file is deleted
JorgeEcheva26's avatar
JorgeEcheva26 committed
                    os.remove(path)