Skip to content
Snippets Groups Projects
capif_event_feature.py 4.07 KiB
Newer Older
from opencapif_sdk import capif_invoker_connector
import os
import logging
import shutil
from requests.auth import HTTPBasicAuth
import urllib3
from OpenSSL.SSL import FILETYPE_PEM
from OpenSSL.crypto import (
    dump_certificate_request,
    dump_privatekey,
    PKey,
    TYPE_RSA,
    X509Req
)
import requests
import json
import warnings
from requests.exceptions import RequestsDependencyWarning
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
warnings.filterwarnings("ignore", category=RequestsDependencyWarning)
# noqa: E501
# Basic configuration of the logger functionality

log_path = 'logs/sdk_logs.log'

log_dir = os.path.dirname(log_path)

if not os.path.exists(log_dir):
    os.makedirs(log_dir)

logging.basicConfig(
    level=logging.NOTSET,  # Minimum severity level to log
    # Log message format
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_path),  # Log to a file
        logging.StreamHandler()  # Also display in the console
    ]
)

class capif_invoker_event_feature(capif_invoker_connector):

    def create_subscription(self):
        invoker_capif_details = self.invoker_capif_details

        subscriberId = invoker_capif_details["api_invoker_id"]

        path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions"

        payload = {
            "events": self.events_description,
            "eventFilters": self.events_filter,
            "eventReq": {},  # TO IMPROVE !!!
            "notificationDestination": f"{self.capif_callback_url}",
            "requestTestNotification": True,
            "websockNotifConfig": {
                "websocketUri": f"{self.capif_callback_url}",
                "requestWebsocketUri": True
            },
            "supportedFeatures": f"{self.supported_features}"
        }
        
        try:
            response = requests.post(
                url=path,
                json=payload,
                headers={"Content-Type": "application/json"},
                cert=(self.signed_key_crt_path, self.private_key_path),
                verify=os.path.join(self.invoker_folder, "ca.crt")
            )
            response.raise_for_status()
            location_header = response.headers.get("Location")

            if location_header:
                # Extrae el identificador de la URL en el encabezado 'Location'
                identifier = location_header.rstrip('/').split('/')[-1]
                self.logger.info(f"Subscriptionid obtained: {identifier}")
            else:
                self.logger.error("The Location header is not available in the response")
            
            path = os.path.join(self.invoker_folder, "capif_subscriptions_id.json")

            # Load or initialize the subscription list
            if os.path.exists(path):
                subscription = self._load_config_file(path)
                # Ensure subscription is a list
                if not isinstance(subscription, list):
                    raise TypeError(f"Expected 'subscription' to be a list, but got {type(subscription).__name__}")
            else:
                subscription = []

            # Find if the subscriberId already exists in the list
            subscriber_entry = next((item for item in subscription if item.get("subscriberId") == subscriberId), None)

            if subscriber_entry is None:
                # If subscriberId is not found, create a new entry
                subscriber_entry = {"subscriberId": subscriberId, "events": []}
                subscription.append(subscriber_entry)

            # Add the event to the subscriber's events list
            subscriber_entry["events"].append({self.events_description: identifier})

            # Save the updated list back to the file
            self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w")

        except Exception as e:
            self.logger.error("Unexpected error: %s", e)
            return None, {"error": f"Unexpected error: {e}"}

    # def delete_subcription(self):
    # def modify_subcription(self):
    # def patch_subcription(self):