Skip to content
sdk.py 81.7 KiB
Newer Older
JorgeEcheva26's avatar
JorgeEcheva26 committed
import os
import logging
import shutil
import subprocess
from requests.auth import HTTPBasicAuth
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Ahora realiza tu solicitud HTTPS a 'localhost'

from OpenSSL.SSL import FILETYPE_PEM
from OpenSSL.crypto import (
    dump_certificate_request,
    dump_privatekey,
    load_publickey,
    PKey,
    TYPE_RSA,
    X509Req,
    dump_publickey,
)
import requests
import json
from uuid import uuid4
import warnings
from requests.exceptions import RequestsDependencyWarning
warnings.filterwarnings("ignore", category=RequestsDependencyWarning)

# Configuración básica del logger
logging.basicConfig(
    level=logging.NOTSET,  # Nivel mínimo de severidad a registrar
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',  # Formato del mensaje de log
    handlers=[
        logging.FileHandler("../Functionalities/sdk_logs.log"),  # Registra en un archivo
JorgeEcheva26's avatar
JorgeEcheva26 committed
        logging.StreamHandler()  # También muestra en la consola
    ]
)

class CAPIFInvokerConnector:
    """
    Τhis class is responsbile for onboarding an Invoker (ex. a Invoker) to CAPIF
    """
    def __init__(self,
                 config_file: str ):

        config_file = os.path.abspath(config_file)
        # Cargar configuración desde archivo si es necesario
        config = self.__load_config_file(config_file)
        
        debug_mode = os.getenv('DEBUG_MODE', config.get('debug_mode', 'False')).strip().lower()
        if debug_mode=="false": debug_mode=False
        
        # Inicializar logger
        self.logger = logging.getLogger(self.__class__.__name__)
        if debug_mode:
            self.logger.setLevel(logging.DEBUG)
        else:
            self.logger.setLevel(logging.WARNING)
            
        

        
        urllib_logger = logging.getLogger("urllib3")
        if not debug_mode:
            urllib_logger.setLevel(logging.WARNING)
        else:
            urllib_logger.setLevel(logging.DEBUG)
        
        self.logger.info("Initializing CAPIFInvokerConnector")

        # Asignar valores desde variables de entorno o desde el archivo de configuración
        
        invoker_general_folder = os.path.abspath(os.getenv('invoker_folder', config.get('invoker_folder', '')).strip())
        
        capif_host = os.getenv('CAPIF_HOST', config.get('capif_host', '')).strip()
        register_host = os.getenv('REGISTER_HOST', config.get('register_host', '')).strip()
        capif_https_port = str(os.getenv('CAPIF_HTTPS_PORT', config.get('capif_https_port', '')).strip())
        capif_register_port = str(os.getenv('CAPIF_REGISTER_PORT', config.get('capif_register_port', '')).strip())
        capif_invoker_username = os.getenv('CAPIF_USERNAME', config.get('capif_username', '')).strip()
        capif_invoker_password = os.getenv('CAPIF_PASSWORD', config.get('capif_password', '')).strip()
        capif_callback_url = os.getenv('CAPIF_CALLBACK_URL', config.get('capif_callback_url', '')).strip()
        
        csr_common_name = os.getenv('CSR_COMMON_NAME', config.get('csr_common_name', '')).strip()
        csr_organizational_unit = os.getenv('CSR_ORGANIZATIONAL_UNIT', config.get('csr_organizational_unit', '')).strip()
        csr_organization = os.getenv('CSR_ORGANIZATION', config.get('csr_organization', '')).strip()
        crs_locality = os.getenv('CRS_LOCALITY', config.get('crs_locality', '')).strip()
        csr_state_or_province_name = os.getenv('CSR_STATE_OR_PROVINCE_NAME', config.get('csr_state_or_province_name', '')).strip()
        csr_country_name = os.getenv('CSR_COUNTRY_NAME', config.get('csr_country_name', '')).strip()
        csr_email_address = os.getenv('CSR_EMAIL_ADDRESS', config.get('csr_email_address', '')).strip()
        
        self.invoker_folder=os.path.join(invoker_general_folder,capif_invoker_username)
        os.makedirs(self.invoker_folder, exist_ok=True)
        # Resto del código original para inicializar URLs y otros atributos
        

        if len(capif_https_port) == 0 or int(capif_https_port) == 443:
            self.capif_https_url = "https://" + capif_host.strip() + "/"
        else:
            self.capif_https_url = (
                "https://" + capif_host.strip() + ":" + capif_https_port.strip() + "/"
            )

        if len(capif_register_port) == 0:
            self.capif_register_url = "https://" + register_host.strip() + ":8084/"
        else:
            self.capif_register_url = (
                "https://" + register_host.strip() + ":" + capif_register_port.strip() + "/"
            )

        self.capif_callback_url = self.__add_trailing_slash_to_url_if_missing(
            capif_callback_url.strip()
        )
    
        self.capif_invoker_username = capif_invoker_username
        self.capif_invoker_password = capif_invoker_password
        
        self.csr_common_name = "invoker_" + csr_common_name
        self.csr_organizational_unit = csr_organizational_unit
        self.csr_organization = csr_organization
        self.crs_locality = crs_locality
        self.csr_state_or_province_name = csr_state_or_province_name
        self.csr_country_name = csr_country_name
        self.csr_email_address = csr_email_address
        self.capif_api_details_filename = "capif_api_security_context_details-"+self.capif_invoker_username+".json"
        #self.capif_api_details = self.__load_invoker_api_details()
        
        self.logger.info("CAPIFInvokerConnector initialized with the config.json parameters")

    def __load_config_file(self, config_file: str):
            """Carga el archivo de configuración."""
            try:
                with open(config_file, 'r') as file:
                    return json.load(file)
            except FileNotFoundError:
                self.logger.warning(f"Configuration file {config_file} not found. Using defaults or environment variables.")
                return {}

    def __add_trailing_slash_to_url_if_missing(self, url):
        if url[len(url) - 1] != "/":
            url = url + "/"
        return url

    def register_and_onboard_Invoker(self) -> None:
        self.logger.info("Registering and onboarding Invoker")
        try:
            public_key = self.__create_private_and_public_keys()
            capif_postauth_info = self.__save_capif_ca_root_file_and_get_auth_token()
            capif_onboarding_url = capif_postauth_info["ccf_onboarding_url"]
            capif_discover_url = capif_postauth_info["ccf_discover_url"]
            capif_access_token = capif_postauth_info["access_token"]
            api_invoker_id = self.__onboard_invoker_to_capif_and_create_the_signed_certificate(
                public_key, capif_onboarding_url, capif_access_token
            )
            self.__write_to_file( api_invoker_id, capif_discover_url)
            self.logger.info("Invoker registered and onboarded successfully")
        except Exception as e:
            self.logger.error(f"Error during Invoker registration and onboarding: {e}")
            raise

    def __load_invoker_api_details(self):
        self.logger.info("Loading Invoker API details")
        path = os.path.join(
            self.invoker_folder, 
            self.capif_api_details_filename
        )
        with open(
            path, "r"
        ) as openfile:
            return json.load(openfile)

    def __offboard_Invoker(self) -> None:
        self.logger.info("Offboarding Invoker")
        try:
            capif_api_details = self.__load_invoker_api_details()
            url = (
                self.capif_https_url
                + "api-invoker-management/v1/onboardedInvokers/"
                + capif_api_details["api_invoker_id"]
            )

            signed_key_crt_path = os.path.join(
                self.invoker_folder, 
                capif_api_details["user_name"] + ".crt"
            )

            private_key_path = os.path.join(
                self.invoker_folder, 
                "private.key"
            )

            path = os.path.join(
                self.invoker_folder, 
                "ca.crt"
            )
            response = requests.request(
                "DELETE",
                url,
                cert=(signed_key_crt_path, private_key_path),
                verify=path,
            )
Loading
Loading full blame…