Newer
Older
from opencapif_sdk import capif_invoker_connector
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import os
import logging
import shutil
from requests.auth import HTTPBasicAuth
import urllib3
from OpenSSL.SSL import FILETYPE_PEM
from OpenSSL.crypto import (
dump_certificate_request,
dump_privatekey,
PKey,
TYPE_RSA,
X509Req
)
import requests
import json
import warnings
from requests.exceptions import RequestsDependencyWarning
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
warnings.filterwarnings("ignore", category=RequestsDependencyWarning)
# noqa: E501
# Basic configuration of the logger functionality
log_path = 'logs/sdk_logs.log'
log_dir = os.path.dirname(log_path)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
logging.basicConfig(
level=logging.NOTSET, # Minimum severity level to log
# Log message format
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_path), # Log to a file
logging.StreamHandler() # Also display in the console
]
)
class capif_invoker_event_feature(capif_invoker_connector):
def create_subscription(self):
invoker_capif_details = self.invoker_capif_details
subscriberId = invoker_capif_details["api_invoker_id"]
path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions"
payload = {
"events": self.events_description,
"eventFilters": self.events_filter,
"eventReq": {}, # TO IMPROVE !!!
"notificationDestination": f"{self.capif_callback_url}",
"requestTestNotification": True,
"websockNotifConfig": {
"websocketUri": f"{self.capif_callback_url}",
"requestWebsocketUri": True
},
"supportedFeatures": f"{self.supported_features}"
}
try:
response = requests.post(
url=path,
json=payload,
headers={"Content-Type": "application/json"},
cert=(self.signed_key_crt_path, self.private_key_path),
verify=os.path.join(self.invoker_folder, "ca.crt")
)
response.raise_for_status()
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
location_header = response.headers.get("Location")
if location_header:
# Extrae el identificador de la URL en el encabezado 'Location'
identifier = location_header.rstrip('/').split('/')[-1]
self.logger.info(f"Subscriptionid obtained: {identifier}")
else:
self.logger.error("The Location header is not available in the response")
path = os.path.join(self.invoker_folder, "capif_subscriptions_id.json")
# Load or initialize the subscription list
if os.path.exists(path):
subscription = self._load_config_file(path)
# Ensure subscription is a list
if not isinstance(subscription, list):
raise TypeError(f"Expected 'subscription' to be a list, but got {type(subscription).__name__}")
else:
subscription = []
# Find if the subscriberId already exists in the list
subscriber_entry = next((item for item in subscription if item.get("subscriberId") == subscriberId), None)
if subscriber_entry is None:
# If subscriberId is not found, create a new entry
subscriber_entry = {"subscriberId": subscriberId, "events": []}
subscription.append(subscriber_entry)
# Add the event to the subscriber's events list
subscriber_entry["events"].append({self.events_description: identifier})
# Save the updated list back to the file
self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w")
except Exception as e:
self.logger.error("Unexpected error: %s", e)
return None, {"error": f"Unexpected error: {e}"}
# def delete_subcription(self):
# def modify_subcription(self):
# def patch_subcription(self):