from opencapif_sdk import capif_invoker_connector import os import logging import shutil from requests.auth import HTTPBasicAuth import urllib3 from OpenSSL.SSL import FILETYPE_PEM from OpenSSL.crypto import ( dump_certificate_request, dump_privatekey, PKey, TYPE_RSA, X509Req ) import requests import json import warnings from requests.exceptions import RequestsDependencyWarning urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) warnings.filterwarnings("ignore", category=RequestsDependencyWarning) # noqa: E501 # Basic configuration of the logger functionality log_path = 'logs/sdk_logs.log' log_dir = os.path.dirname(log_path) if not os.path.exists(log_dir): os.makedirs(log_dir) logging.basicConfig( level=logging.NOTSET, # Minimum severity level to log # Log message format format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_path), # Log to a file logging.StreamHandler() # Also display in the console ] ) class capif_invoker_event_feature(capif_invoker_connector): def create_subscription(self): invoker_capif_details = self.invoker_capif_details subscriberId = invoker_capif_details["api_invoker_id"] path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions" payload = { "events": self.events_description, "eventFilters": self.events_filter, "eventReq": {}, # TO IMPROVE !!! "notificationDestination": f"{self.capif_callback_url}", "requestTestNotification": True, "websockNotifConfig": { "websocketUri": f"{self.capif_callback_url}", "requestWebsocketUri": True }, "supportedFeatures": f"{self.supported_features}" } try: response = requests.post( url=path, json=payload, headers={"Content-Type": "application/json"}, cert=(self.signed_key_crt_path, self.private_key_path), verify=os.path.join(self.invoker_folder, "ca.crt") ) response.raise_for_status() location_header = response.headers.get("Location") if location_header: # Extrae el identificador de la URL en el encabezado 'Location' identifier = location_header.rstrip('/').split('/')[-1] self.logger.info(f"Subscriptionid obtained: {identifier}") else: self.logger.error("The Location header is not available in the response") path = os.path.join(self.invoker_folder, "capif_subscriptions_id.json") # Load or initialize the subscription list if os.path.exists(path): subscription = self._load_config_file(path) # Ensure subscription is a list if not isinstance(subscription, list): raise TypeError(f"Expected 'subscription' to be a list, but got {type(subscription).__name__}") else: subscription = [] # Find if the subscriberId already exists in the list subscriber_entry = next((item for item in subscription if item.get("subscriberId") == subscriberId), None) if subscriber_entry is None: # If subscriberId is not found, create a new entry subscriber_entry = {"subscriberId": subscriberId, "events": []} subscription.append(subscriber_entry) # Add the event to the subscriber's events list subscriber_entry["events"].append({self.events_description: identifier}) # Save the updated list back to the file self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w") except Exception as e: self.logger.error("Unexpected error: %s", e) return None, {"error": f"Unexpected error: {e}"} # def delete_subcription(self): # def modify_subcription(self): # def patch_subcription(self):