Skip to content
capif_provider_connector.py 56 KiB
Newer Older
JorgeEcheva26's avatar
JorgeEcheva26 committed
from requests.exceptions import RequestsDependencyWarning
import warnings
import json
import requests
from OpenSSL.crypto import (
    dump_certificate_request,
    dump_privatekey,
    PKey,
    TYPE_RSA,
    X509Req
)
from OpenSSL.SSL import FILETYPE_PEM
import os
import logging
import shutil
import subprocess
from requests.auth import HTTPBasicAuth
import urllib3
JorgeEcheva26's avatar
JorgeEcheva26 committed
import ssl
import socket
JorgeEcheva26's avatar
JorgeEcheva26 committed
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


warnings.filterwarnings("ignore", category=RequestsDependencyWarning)

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
# Basic logger configuration
JorgeEcheva26's avatar
JorgeEcheva26 committed

log_path = 'logs/sdk_logs.log'

log_dir = os.path.dirname(log_path)

if not os.path.exists(log_dir):
    os.makedirs(log_dir)

logging.basicConfig(
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
    level=logging.NOTSET,  # Minimum severity level to log
    # Log message format
JorgeEcheva26's avatar
JorgeEcheva26 committed
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        logging.FileHandler(log_path),  # Logs to a file
        logging.StreamHandler()  # Also outputs to the console
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
class capif_provider_connector:
JorgeEcheva26's avatar
JorgeEcheva26 committed
    """
    Τhis class is responsible for onboarding an exposer (eg. NEF emulator) to CAPIF
    """

    def __init__(self, config_file: str):
        """
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        Initializes the CAPIFProvider connector with the parameters specified in the configuration file.
JorgeEcheva26's avatar
JorgeEcheva26 committed
        """
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Load configuration from file if necessary
JorgeEcheva26's avatar
JorgeEcheva26 committed
        config_file = os.path.abspath(config_file)
        self.config_path = os.path.dirname(config_file)+"/"
        config = self.__load_config_file(config_file)
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        debug_mode = os.getenv('DEBUG_MODE', config.get('debug_mode', 'False')).strip().lower()
JorgeEcheva26's avatar
JorgeEcheva26 committed
        if debug_mode == "false":
            debug_mode = False
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        else:
            debug_mode = True

        # Initialize logger for this class
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.logger = logging.getLogger(self.__class__.__name__)
        if debug_mode:
            self.logger.setLevel(logging.DEBUG)
        else:
            self.logger.setLevel(logging.WARNING)

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
        # Set logging level for urllib based on debug_mode
JorgeEcheva26's avatar
JorgeEcheva26 committed
        urllib_logger = logging.getLogger("urllib3")
        if not debug_mode:
            urllib_logger.setLevel(logging.WARNING)
        else:
            urllib_logger.setLevel(logging.DEBUG)

        try:
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Retrieve provider configuration from JSON or environment variables
            provider_config = config.get('provider', {})
JorgeEcheva26's avatar
JorgeEcheva26 committed
            provider_general_folder = os.path.abspath(
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                os.getenv('PROVIDER_FOLDER', provider_config.get('provider_folder', '')).strip())

            capif_host = os.getenv('CAPIF_HOST', config.get('capif_host', '')).strip()
            capif_register_host = os.getenv('REGISTER_HOST', config.get('register_host', '')).strip()
            capif_https_port = str(os.getenv('CAPIF_HTTPS_PORT', config.get('capif_https_port', '')).strip())
            capif_register_port = str(os.getenv('CAPIF_REGISTER_PORT', config.get('capif_register_port', '')).strip())
            capif_provider_username = os.getenv('CAPIF_USERNAME', config.get('capif_username', '')).strip()
            capif_provider_password = os.getenv('CAPIF_PASSWORD', config.get('capif_password', '')).strip()

            # Get CSR (Certificate Signing Request) details from config or environment variables
            cert_generation = provider_config.get('cert_generation', {})
            csr_common_name = os.getenv('PROVIDER_CSR_COMMON_NAME', cert_generation.get('csr_common_name', '')).strip()
            csr_organizational_unit = os.getenv('PROVIDER_CSR_ORGANIZATIONAL_UNIT', cert_generation.get('csr_organizational_unit', '')).strip()
            csr_organization = os.getenv('PROVIDER_CSR_ORGANIZATION', cert_generation.get('csr_organization', '')).strip()
JorgeEcheva26's avatar
JorgeEcheva26 committed
            csr_locality = os.getenv('PROVIDER_CSR_LOCALITY', cert_generation.get('csr_locality', '')).strip()
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            csr_state_or_province_name = os.getenv('PROVIDER_CSR_STATE_OR_PROVINCE_NAME', cert_generation.get('csr_state_or_province_name', '')).strip()
            csr_country_name = os.getenv('PROVIDER_CSR_COUNTRY_NAME', cert_generation.get('csr_country_name', '')).strip()
            csr_email_address = os.getenv('PROVIDER_CSR_EMAIL_ADDRESS', cert_generation.get('csr_email_address', '')).strip()

            # Retrieve provider specific values (APFs, AEFs)
            supported_features = os.getenv('PROVIDER_SUPPORTED_FEATURES', provider_config.get('supported_features', '')).strip()
            if not supported_features:
                supported_features = "0"
            apfs = os.getenv('PROVIDER_APFS', provider_config.get('apfs', '')).strip()
            aefs = os.getenv('PROVIDER_AEFS', provider_config.get('aefs', '')).strip()
            api_description_path = os.path.abspath(os.getenv('PROVIDER_API_DESCRIPTION_PATH', provider_config.get('api_description_path', '')).strip())
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed

            # Check required fields and log warnings/errors
JorgeEcheva26's avatar
JorgeEcheva26 committed
            if not capif_host:
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                self.logger.warning("CAPIF_HOST is not provided; defaulting to an empty string")
JorgeEcheva26's avatar
JorgeEcheva26 committed
            if not capif_provider_username:
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                self.logger.error("CAPIF_PROVIDER_USERNAME is required but not provided")
JorgeEcheva26's avatar
JorgeEcheva26 committed
                raise ValueError("CAPIF_PROVIDER_USERNAME is required")

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Setup the folder to store provider files (e.g., certificates)
            self.provider_folder = os.path.join(provider_general_folder, capif_provider_username)
JorgeEcheva26's avatar
JorgeEcheva26 committed
            os.makedirs(self.provider_folder, exist_ok=True)

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Set attributes for provider credentials and configuration
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.capif_host = capif_host.strip()
            self.capif_provider_username = capif_provider_username
            self.capif_provider_password = capif_provider_password
            self.capif_register_host = capif_register_host
            self.capif_register_port = capif_register_port
            self.csr_common_name = csr_common_name
            self.csr_organizational_unit = csr_organizational_unit
            self.csr_organization = csr_organization
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.csr_locality = csr_locality
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.csr_state_or_province_name = csr_state_or_province_name
            self.csr_country_name = csr_country_name
            self.csr_email_address = csr_email_address
            self.supported_features = supported_features
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.aefs = int(aefs)
            self.apfs = int(apfs)
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed

            # Get publish request details from config or environment variables
            publish_req_config = provider_config.get('publish_req', {})
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.publish_req = {
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
                "service_api_id": os.getenv('PUBLISH_REQ_SERVICE_API_ID', publish_req_config.get('service_api_id', '')).strip(),
                "publisher_apf_id": os.getenv('PUBLISH_REQ_PUBLISHER_APF_ID', publish_req_config.get('publisher_apf_id', '')).strip(),
                "publisher_aefs_ids": os.getenv('PUBLISH_REQ_PUBLISHER_AEFS_IDS', publish_req_config.get('publisher_aefs_ids', ''))
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Set the path for the API description file
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.api_description_path = api_description_path

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Set the CAPIF HTTPS port and construct CAPIF URLs
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.capif_https_port = str(capif_https_port)

            self.provider_capif_ids = {}
JorgeEcheva26's avatar
JorgeEcheva26 committed
            path_prov_funcs = os.path.join(self.provider_folder, "provider_capif_ids.json")
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            if os.path.exists(path_prov_funcs):
JorgeEcheva26's avatar
JorgeEcheva26 committed
                self.provider_capif_ids = self._load_provider_api_details()
JorgeEcheva26's avatar
JorgeEcheva26 committed
            path_published = os.path.join(self.provider_folder, "provider_service_ids.json")
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            if os.path.exists(path_published):
JorgeEcheva26's avatar
JorgeEcheva26 committed
                self.provider_service_ids = self.__load_config_file(path_published)
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed

            # Construct the CAPIF HTTPS URL
JorgeEcheva26's avatar
JorgeEcheva26 committed
            if len(self.capif_https_port) == 0 or int(self.capif_https_port) == 443:
                self.capif_https_url = f"https://{capif_host.strip()}/"
            else:
                self.capif_https_url = f"https://{capif_host.strip()}:{self.capif_https_port.strip()}/"

Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Construct the CAPIF register URL
JorgeEcheva26's avatar
JorgeEcheva26 committed
            if len(capif_register_port) == 0:
                self.capif_register_url = f"https://{capif_register_host.strip()}:8084/"
            else:
                self.capif_register_url = f"https://{capif_register_host.strip()}:{capif_register_port.strip()}/"
JorgeEcheva26's avatar
JorgeEcheva26 committed
            events_config = provider_config.get('events', {})
            self.events_description = os.getenv('PROVIDER_EVENTS_DESCRIPTION', events_config.get('description', ''))
            self.events_filter = os.getenv('PROVIDER_EVENTS_FILTERS', events_config.get('eventFilters', ''))
            self.notification_destination = os.getenv('PROVIDER_EVENTS_FILTERS', events_config.get('notificationDestination', ''))
            self.websock_notif_config = os.getenv('PROVIDER_EVENTS_FILTERS', events_config.get('websockNotifConfig', ''))
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Log initialization success message
            self.logger.info("capif_provider_connector initialized with the capif_sdk_config.json parameters")
JorgeEcheva26's avatar
JorgeEcheva26 committed

        except Exception as e:
Jorge Echevarria Uribarri's avatar
Jorge Echevarria Uribarri committed
            # Catch and log any exceptions that occur during initialization
JorgeEcheva26's avatar
JorgeEcheva26 committed
            self.logger.error(f"Error during initialization: {e}")
            raise

    def __store_certificate(self) -> None:
JorgeEcheva26's avatar
JorgeEcheva26 committed
        self.logger.info("Retrieving capif_cert_server.pem...")
JorgeEcheva26's avatar
JorgeEcheva26 committed

        try:
JorgeEcheva26's avatar
JorgeEcheva26 committed
            # Crear un contexto SSL que no valide certificados
            context = ssl.create_default_context()
            context.check_hostname = False
            context.verify_mode = ssl.CERT_NONE
            with socket.create_connection((self.capif_host, self.capif_https_port)) as sock:
                with context.wrap_socket(sock, server_hostname=self.capif_host) as ssock:
                    cert = ssock.getpeercert(binary_form=True)
                    cert_file = os.path.join(self.provider_folder, "capif_cert_server.pem")
                    with open(cert_file, "wb") as f:
Loading
Loading full blame...