from capif_invoker_connector import capif_invoker_connector from capif_provider_connector import capif_provider_connector import os import logging import shutil from requests.auth import HTTPBasicAuth import urllib3 from OpenSSL.SSL import FILETYPE_PEM from OpenSSL.crypto import ( dump_certificate_request, dump_privatekey, PKey, TYPE_RSA, X509Req ) import requests import json import warnings from requests.exceptions import RequestsDependencyWarning urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) warnings.filterwarnings("ignore", category=RequestsDependencyWarning) # noqa: E501 # Basic configuration of the logger functionality log_path = 'logs/sdk_logs.log' log_dir = os.path.dirname(log_path) if not os.path.exists(log_dir): os.makedirs(log_dir) logging.basicConfig( level=logging.NOTSET, # Minimum severity level to log # Log message format format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_path), # Log to a file logging.StreamHandler() # Also display in the console ] ) class capif_invoker_event_feature(capif_invoker_connector): def create_subscription: invoker_capif_details = self.__load_invoker_api_details() subscriberId = invoker_capif_details["api_invoker_id"] path = self.capif_https_url + f"/{subscriberId}/subscriptions" payload = self.events_config try: response = requests.post( url=path, json=payload, headers={"Content-Type": "application/json"}, cert=cert, verify=os.path.join(self.invoker_folder, "ca.crt") ) response.raise_for_status() return response.status_code, response.json() except Exception as e: self.logger.error("Unexpected error: %s", e) return None, {"error": f"Unexpected error: {e}"} def delete_subcription: def modify_subcription: def patch_subcription: