Newer
Older
from requests.exceptions import RequestsDependencyWarning
import warnings
import json
import requests
from OpenSSL.crypto import (
dump_certificate_request,
dump_privatekey,
PKey,
TYPE_RSA,
X509Req
)
from OpenSSL.SSL import FILETYPE_PEM
import os
import logging
import shutil
import subprocess
from requests.auth import HTTPBasicAuth
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
warnings.filterwarnings("ignore", category=RequestsDependencyWarning)
log_path = 'logs/sdk_logs.log'
log_dir = os.path.dirname(log_path)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
logging.basicConfig(
level=logging.NOTSET, # Minimum severity level to log
# Log message format
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_path), # Logs to a file
logging.StreamHandler() # Also outputs to the console
"""
Τhis class is responsible for onboarding an exposer (eg. NEF emulator) to CAPIF
"""
def __init__(self, config_file: str):
"""
Initializes the CAPIFProvider connector with the parameters specified in the configuration file.
config_file = os.path.abspath(config_file)
self.config_path = os.path.dirname(config_file)+"/"
config = self.__load_config_file(config_file)
debug_mode = os.getenv('DEBUG_MODE', config.get('debug_mode', 'False')).strip().lower()
else:
debug_mode = True
# Initialize logger for this class
self.logger = logging.getLogger(self.__class__.__name__)
if debug_mode:
self.logger.setLevel(logging.DEBUG)
else:
self.logger.setLevel(logging.WARNING)
# Set logging level for urllib based on debug_mode
urllib_logger = logging.getLogger("urllib3")
if not debug_mode:
urllib_logger.setLevel(logging.WARNING)
else:
urllib_logger.setLevel(logging.DEBUG)
try:
# Retrieve provider configuration from JSON or environment variables
provider_config = config.get('provider', {})
os.getenv('PROVIDER_FOLDER', provider_config.get('provider_folder', '')).strip())
capif_host = os.getenv('CAPIF_HOST', config.get('capif_host', '')).strip()
capif_register_host = os.getenv('REGISTER_HOST', config.get('register_host', '')).strip()
capif_https_port = str(os.getenv('CAPIF_HTTPS_PORT', config.get('capif_https_port', '')).strip())
capif_register_port = str(os.getenv('CAPIF_REGISTER_PORT', config.get('capif_register_port', '')).strip())
capif_provider_username = os.getenv('CAPIF_USERNAME', config.get('capif_username', '')).strip()
capif_provider_password = os.getenv('CAPIF_PASSWORD', config.get('capif_password', '')).strip()
# Get CSR (Certificate Signing Request) details from config or environment variables
cert_generation = provider_config.get('cert_generation', {})
csr_common_name = os.getenv('PROVIDER_CSR_COMMON_NAME', cert_generation.get('csr_common_name', '')).strip()
csr_organizational_unit = os.getenv('PROVIDER_CSR_ORGANIZATIONAL_UNIT', cert_generation.get('csr_organizational_unit', '')).strip()
csr_organization = os.getenv('PROVIDER_CSR_ORGANIZATION', cert_generation.get('csr_organization', '')).strip()
csr_locality = os.getenv('PROVIDER_CSR_LOCALITY', cert_generation.get('csr_locality', '')).strip()
csr_state_or_province_name = os.getenv('PROVIDER_CSR_STATE_OR_PROVINCE_NAME', cert_generation.get('csr_state_or_province_name', '')).strip()
csr_country_name = os.getenv('PROVIDER_CSR_COUNTRY_NAME', cert_generation.get('csr_country_name', '')).strip()
csr_email_address = os.getenv('PROVIDER_CSR_EMAIL_ADDRESS', cert_generation.get('csr_email_address', '')).strip()
# Retrieve provider specific values (APFs, AEFs)
supported_features = os.getenv('PROVIDER_SUPPORTED_FEATURES', provider_config.get('supported_features', '')).strip()
if not supported_features:
supported_features = "0"
apfs = os.getenv('PROVIDER_APFS', provider_config.get('apfs', '')).strip()
aefs = os.getenv('PROVIDER_AEFS', provider_config.get('aefs', '')).strip()
api_description_path = os.path.abspath(os.getenv('PROVIDER_API_DESCRIPTION_PATH', provider_config.get('api_description_path', '')).strip())
# Check required fields and log warnings/errors
self.logger.warning("CAPIF_HOST is not provided; defaulting to an empty string")
self.logger.error("CAPIF_PROVIDER_USERNAME is required but not provided")
raise ValueError("CAPIF_PROVIDER_USERNAME is required")
# Setup the folder to store provider files (e.g., certificates)
self.provider_folder = os.path.join(provider_general_folder, capif_provider_username)
os.makedirs(self.provider_folder, exist_ok=True)
# Set attributes for provider credentials and configuration
self.capif_host = capif_host.strip()
self.capif_provider_username = capif_provider_username
self.capif_provider_password = capif_provider_password
self.capif_register_host = capif_register_host
self.capif_register_port = capif_register_port
self.csr_common_name = csr_common_name
self.csr_organizational_unit = csr_organizational_unit
self.csr_organization = csr_organization
self.csr_state_or_province_name = csr_state_or_province_name
self.csr_country_name = csr_country_name
self.csr_email_address = csr_email_address
self.supported_features = supported_features
# Get publish request details from config or environment variables
publish_req_config = provider_config.get('publish_req', {})
"service_api_id": os.getenv('PUBLISH_REQ_SERVICE_API_ID', publish_req_config.get('service_api_id', '')).strip(),
"publisher_apf_id": os.getenv('PUBLISH_REQ_PUBLISHER_APF_ID', publish_req_config.get('publisher_apf_id', '')).strip(),
"publisher_aefs_ids": os.getenv('PUBLISH_REQ_PUBLISHER_AEFS_IDS', publish_req_config.get('publisher_aefs_ids', ''))
self.api_description_path = api_description_path
# Set the CAPIF HTTPS port and construct CAPIF URLs
path_prov_funcs=os.path.join(self.provider_folder,"provider_capif_ids.json")
self.provider_capif_ids=self.__load_provider_api_details()
path_published=os.path.join(self.provider_folder,"provider_service_ids.json")
self.provider_service_ids=self.__load_config_file(path_published)
if len(self.capif_https_port) == 0 or int(self.capif_https_port) == 443:
self.capif_https_url = f"https://{capif_host.strip()}/"
else:
self.capif_https_url = f"https://{capif_host.strip()}:{self.capif_https_port.strip()}/"
if len(capif_register_port) == 0:
self.capif_register_url = f"https://{capif_register_host.strip()}:8084/"
else:
self.capif_register_url = f"https://{capif_register_host.strip()}:{capif_register_port.strip()}/"
# Log initialization success message
self.logger.info("capif_provider_connector initialized with the capif_sdk_config.json parameters")
# Catch and log any exceptions that occur during initialization
self.logger.error(f"Error during initialization: {e}")
raise
def __store_certificate(self) -> None:
# Retrieves and stores the cert_server.pem from CAPIF.
self.logger.info(
"Retrieving capif_cert_server.pem, this may take a few minutes.")
cmd = f"openssl s_client -connect {self.capif_host}:{self.capif_https_port} | openssl x509 -text > {self.provider_folder}/capif_cert_server.pem"
try:
# Redirects standard output and error to os.devnull to hide logs
with open(os.devnull, 'w') as devnull:
subprocess.run(cmd, shell=True, check=True,
stdout=devnull, stderr=devnull)
cert_file = os.path.join(
self.provider_folder, "capif_cert_server.pem")
if os.path.exists(cert_file) and os.path.getsize(cert_file) > 0:
self.logger.info("cert_server.pem successfully generated!")
else:
self.logger.error("Failed to generate cert_server.pem.")
Loading
Loading full blame...