Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ocf/sdk
1 result
Show changes
Showing
with 916 additions and 48 deletions
doc/images/flows_invoker_check_authentication.jpg

99.9 KiB

doc/images/flows_invoker_discover.jpg

99.8 KiB

doc/images/flows_invoker_get_tokens.jpg

101 KiB

doc/images/flows_invoker_onboard.jpg

101 KiB

doc/images/flows_invoker_path.jpg

104 KiB

doc/images/flows_invoker_update_offboard.jpg

99 KiB

doc/images/flows_provider_onboard.jpg

102 KiB

doc/images/flows_provider_path.jpg

101 KiB

doc/images/flows_provider_publish_functions.jpg

101 KiB

doc/images/flows_provider_update_offboard.jpg

98.9 KiB

doc/images/flows_updated_opencapif.jpg

98.4 KiB

doc/images/publish_req_example.png

82.4 KiB

......@@ -32,7 +32,7 @@ When configuring the SDK as a **Network App Invoker**, the following fields must
- `invoker_folder`
- `capif_callback_url`
- `supported_features`
- `cert_generation` (fields such as `csr_common_name`, `csr_country_name`, etc.)
- `cert_generation` (fields such as `csr_common_name`, `csr_country_name`, etc.) For csr_country_name it is important to fulfill the field with [THIS format](https://www.ssl.com/country-codes/)
**Optional:**
- `discover_filter`: useful to enable the discovery of specific APIs. Some fields under [`discover_filter`](#configuration-of-discover_filter) structure required to be configured when using discovery filters. Check devoted section below,
......@@ -97,6 +97,7 @@ This file can also be populated using [environment variables](../samples/envirom
- [`publish_req`](#configuration-of-publish_req): Fields required for API publishing.
- `api_description_path`: The path to the [ServiceAPIDescription](https://github.com/jdegre/5GC_APIs/blob/Rel-18/TS29222_CAPIF_Publish_Service_API.yaml) JSON file.
- `check_authentication_data`: The `ip` and `port` of the target Provider's AEF to get their supported features from.
- `log`: The structure defined in the [Log schema](https://github.com/jdegre/5GC_APIs/blob/Rel-18/TS29222_CAPIF_Logging_API_Invocation_API.yaml), it is not needed to fulfill the `apiId` field.
## Configuration via `capif_sdk_register.json`
......
......@@ -15,7 +15,7 @@ Before using the SDK, the following steps should be completed:
## Available SDK Usage Modes
![GENERAL CAPIF USAGE FLOW](./images/flows_updated_opencapif.jpg)
![GENERAL CAPIF USAGE FLOW](./images/flows-updated_opencapif.jpg)
The repository provides two modes for utilizing the OpenCAPIF SDK:
......@@ -77,7 +77,7 @@ OpenCAPIF SDK references:
The SDK simplifies the onboarding process, allowing providers to register multiple APFs and AEFs. All APFs, AEFs, and AMF certificates are created and stored in `provider_service_ids.json`.
![Provider_onboard](./images/flows_provider_onboard.jpg)
![Provider_onboard](./images/flows-provider_onboard.jpg)
### Service Publishing
......@@ -110,6 +110,7 @@ The SDK simplifies API deletion. Service deletion requires prior onboarding and
**Required SDK inputs**:
- publisher_apf_id
- publisher_aefs_ids
- service_api_id
### Service Update
......@@ -147,7 +148,7 @@ Retrieve information about all previously published services in `service_receive
**Required SDK input**:
- publisher_apf_id
![Provider_publish](./images/flows_provider_publish_functions.jpg)
![Provider_publish](./images/flows-provider_publish_functions.jpg)
### Update and Offboard Provider
......@@ -160,7 +161,95 @@ OpenCAPIF SDK references:
The provider must be onboarded before using these features.
![Provider_update-offboard](./images/flows_provider_update_offboard.jpg)
![Provider_update-offboard](./images/flows-provider_update_offboard.jpg)
### Create logs
OpenCAPIF SDK references:
- **Function**: `create_logs(aefId, api_invoker_id)`
The provider notifies to the CCF that the published API has been used by certain invoker.
For leveraging this feature the Provider must have onboarded and published an API previously.
**Required SDK input**:
- aefId (Within the function)
- api_invoker_id (Within the function)
- log (Within [SDK configuration](./sdk_configuration.md) or object)
![Provider_logs](./images/flows-provider_logs.jpg)
### Create subscription
OpenCAPIF SDK references:
- **Function**: `create_subscription(name, id)`
The provider ask to the CCF about notifications related to services such as SERVICE_API_AVAILABLE or API_INVOKER_UPDATED.
This services are specificated in [CAPIF Events API management](https://github.com/jdegre/5GC_APIs/blob/Rel-18/TS29222_CAPIF_Events_API.yaml) explained in [SDK configuration](./sdk_configuration.md#events_configuration)
For leveraging this feature the Provider must have onboarded previously.
**Required SDK input**:
- aefId//apfId//amfId (Within the function)
- name: An arbitrary name we want to set in order to store it.
- events (Within [SDK configuration](./sdk_configuration.md#events_configuration) or object)
### Delete subscription
OpenCAPIF SDK references:
- **Function**: `delete_subscription(name, id)`
The provider ask to the CCF to withdraw the subscription to the notifications asked previously
For leveraging this feature the Provider must have onboarded and created a subscription previously.
**Required SDK input**:
- aefId//apfId//amfId (Within the function)
- name: The name of your subscription.
### Update subscription
OpenCAPIF SDK references:
- **Function**: `update_subscription(name, id)`
The provider ask to the CCF about updating the subscription for receiving different services such as SERVICE_API_AVAILABLE or API_INVOKER_UPDATED, changing the URL for receiving the notifications...
This services are specificated in [CAPIF Events API management](https://github.com/jdegre/5GC_APIs/blob/Rel-18/TS29222_CAPIF_Events_API.yaml) explained in [SDK configuration](./sdk_configuration.md#events_configuration)
For leveraging this feature the Provider must have onboarded and created a subscription previously.
![Events_feature](./images/flows-event_subscription.jpg)
**ONLY AVAILABLE IN CAPIF RELEASE 2**
**Required SDK input**:
- aefId//apfId//amfId (Within the function)
- name: The name of your subscription.
- events (Within [SDK configuration](./sdk_configuration.md#events_configuration) or object)
### Patch subscription
OpenCAPIF SDK references:
- **Function**: `update_subscription(name, id)`
The provider ask to the CCF about updating the subscription for receiving different services such as SERVICE_API_AVAILABLE or API_INVOKER_UPDATED.
This services are specificated in [CAPIF Events API management](https://github.com/jdegre/5GC_APIs/blob/Rel-18/TS29222_CAPIF_Events_API.yaml) explained in [SDK configuration](./sdk_configuration.md#events_configuration)
For leveraging this feature the Provider must have onboarded and created a subscription previously.
**ONLY AVAILABLE IN CAPIF RELEASE 2**
**Required SDK input**:
- aefId//apfId//amfId (Within the function)
- name: The name of your subscription.
- events (Within [SDK configuration](./sdk_configuration.md#events_configuration) or object)
## Invoker Network App
......@@ -187,7 +276,7 @@ OpenCAPIF SDK references:
The SDK streamlines the invoker onboarding process, storing the `api_invoker_id` in the `capif_api_security_context_details.json`.
![Invoker_onboard](./images/flows_invoker_onboard.jpg)
![Invoker_onboard](./images/flows-invoker_onboard.jpg)
### Service Discovery
......@@ -201,7 +290,7 @@ The [discover_filter](./sdk_configuration.md) can be used to retrieve access to
Use the [discover_filter](./sdk_configuration.md) to retrieve access to target APIs. Ensure you are [onboarded as an invoker](#invoker-onboarding) before using this feature.
![Invoker_discover](./images/flows_invoker_discover.jpg)
![Invoker_discover](./images/flows-invoker_discover.jpg)
### Obtain JWT Tokens
......@@ -211,7 +300,7 @@ OpenCAPIF SDK references:
The SDK facilitates JWT token creation for secure access to target APIs. This process stores JWT access token in `capif_api_security_context_details.json`.
![Invoker_get_token](./images/flows_invoker_get_tokens.jpg)
![Invoker_get_token](./images/flows-invoker_get_tokens.jpg)
### Check authentication
......@@ -225,7 +314,7 @@ It is mandatory to have obtained the [JWT token](#obtain-jwt-tokens) previously.
**Required SDK inputs**:
- check_authentication_data
![Invoker_check_authentication](./images/flows_invoker_check_authentication.jpg)
![Invoker_check_authentication](./images/flows-invoker_check_authentication.jpg)
### Update and Offboard Invoker
......@@ -235,7 +324,7 @@ OpenCAPIF SDK references:
Onboarding is required before utilizing these functions.
![Invoker_update-offboard](./images/flows_invoker_update_offboard.jpg)
![Invoker_update-offboard](./images/flows-invoker_update_offboard.jpg)
## Other Features
......@@ -266,4 +355,4 @@ OpenCAPIF SDK reference:
Simplifies the logout process for admin users and removes a CAPIF user.
![Register picture](./images/flows_sdk_with_register.jpg)
![Register picture](./images/flows-sdk_with_register.jpg)
......@@ -61,6 +61,15 @@
"csr_country_name": "",
"csr_email_address": ""
},
"api_description_path": ""
"api_description_path": "",
"log":{
"apiName": "",
"apiVersion": "",
"resourceName": "",
"uri": "",
"protocol": "",
"operation": "",
"result": ""
}
}
}
......@@ -61,6 +61,15 @@
"csr_country_name": "",
"csr_email_address": ""
},
"api_description_path": ""
"api_description_path": "",
"log":{
"apiName": "",
"apiVersion": "",
"resourceName": "",
"uri": "",
"protocol": "",
"operation": "",
"result": ""
}
}
}
......@@ -2,5 +2,7 @@ from opencapif_sdk.capif_invoker_connector import capif_invoker_connector
from opencapif_sdk.capif_provider_connector import capif_provider_connector
from opencapif_sdk.service_discoverer import service_discoverer
from opencapif_sdk.api_schema_translator import api_schema_translator
from opencapif_sdk.capif_logging_feature import capif_logging_feature
from opencapif_sdk.capif_event_feature import capif_invoker_event_feature, capif_provider_event_feature
__all__ = ["capif_invoker_connector", "service_discoverer", "capif_provider_connector", "api_schema_translator"]
\ No newline at end of file
__all__ = ["capif_invoker_connector", "service_discoverer", "capif_provider_connector", "api_schema_translator", "capif_logging_feature", "capif_invoker_event_feature", "capif_provider_event_feature"]
\ No newline at end of file
from opencapif_sdk import capif_invoker_connector,capif_provider_connector
import os
import logging
import shutil
from requests.auth import HTTPBasicAuth
import urllib3
from OpenSSL.SSL import FILETYPE_PEM
from OpenSSL.crypto import (
dump_certificate_request,
dump_privatekey,
PKey,
TYPE_RSA,
X509Req
)
import requests
import json
import warnings
from requests.exceptions import RequestsDependencyWarning
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
warnings.filterwarnings("ignore", category=RequestsDependencyWarning)
# noqa: E501
# Basic configuration of the logger functionality
log_path = 'logs/sdk_logs.log'
log_dir = os.path.dirname(log_path)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
logging.basicConfig(
level=logging.NOTSET, # Minimum severity level to log
# Log message format
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_path), # Log to a file
logging.StreamHandler() # Also display in the console
]
)
class capif_invoker_event_feature(capif_invoker_connector):
def create_subscription(self, name):
invoker_capif_details = self.invoker_capif_details
subscriberId = invoker_capif_details["api_invoker_id"]
path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions"
payload = {
"events": self.events_description,
"eventFilters": self.events_filter,
"eventReq": {}, # TO IMPROVE !!!
"notificationDestination": f"{self.capif_callback_url}",
"requestTestNotification": True,
"websockNotifConfig": {
"websocketUri": f"{self.capif_callback_url}",
"requestWebsocketUri": True
},
"supportedFeatures": f"{self.supported_features}"
}
try:
response = requests.post(
url=path,
json=payload,
headers={"Content-Type": "application/json"},
cert=(self.signed_key_crt_path, self.private_key_path),
verify=os.path.join(self.invoker_folder, "ca.crt")
)
response.raise_for_status()
location_header = response.headers.get("Location")
if location_header:
# Extrae el identificador de la URL en el encabezado 'Location'
identifier = location_header.rstrip('/').split('/')[-1]
self.logger.info(f"Subscriptionid obtained: {identifier}")
else:
self.logger.error("The Location header is not available in the response")
path = os.path.join(self.invoker_folder, "capif_subscriptions_id.json")
# Load or initialize the subscription dictionary
# Load or initialize the subscription dictionary
if os.path.exists(path):
subscription = self._load_config_file(path)
if not isinstance(subscription, dict):
raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}")
else:
subscription = {}
if not isinstance(subscriberId, (str, int)):
raise TypeError(f"Expected 'subscriberId' to be a string or integer, but got {type(subscriberId).__name__}")
# Convert events_description to a string if it isn't already
if not isinstance(name, str):
name = str(name)
if str(subscriberId) not in subscription:
# If the subscriberId is not in the subscription, create an empty dictionary for it
subscription[str(subscriberId)] = {}
# Update the subscription structure
subscription[str(subscriberId)][name] = identifier
# Save the updated dictionary back to the file
self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w")
except Exception as e:
self.logger.error("Unexpected error: %s", e)
return None, {"error": f"Unexpected error: {e}"}
def delete_subscription(self, name):
invoker_capif_details = self.invoker_capif_details
subscriberId = invoker_capif_details["api_invoker_id"]
path = os.path.join(self.invoker_folder, "capif_subscriptions_id.json")
if os.path.exists(path):
subscription = self._load_config_file(path)
if not isinstance(subscription, dict):
raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}")
if subscriberId in subscription and name in subscription[subscriberId]:
identifier = subscription[subscriberId][name]
# Attempt to delete the subscription from CAPIF
delete_path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions/{identifier}"
try:
response = requests.delete(
url=delete_path,
headers={"Content-Type": "application/json"},
cert=(self.signed_key_crt_path, self.private_key_path),
verify=os.path.join(self.invoker_folder, "ca.crt")
)
response.raise_for_status()
# Remove the service entry from the subscription dictionary
del subscription[subscriberId][name]
# If no more services exist for the subscriber, remove the subscriber entry
if not subscription[subscriberId]:
del subscription[subscriberId]
# Save the updated dictionary back to the file
self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w")
self.logger.info(f"Successfully deleted subscription for service '{name}'")
except Exception as e:
self.logger.error("Unexpected error: %s", e)
return None, {"error": f"Unexpected error: {e}"}
else:
self.logger.warning(f"Service '{name}' not found for subscriber '{subscriberId}'")
return None, {"error": f"Service '{name}' not found for subscriber '{subscriberId}'"}
else:
self.logger.error("Subscription file not found at path: %s", path)
return None, {"error": "Subscription file not found"}
def update_subcription(self, name):
invoker_capif_details = self.invoker_capif_details
subscriberId = invoker_capif_details["api_invoker_id"]
path = os.path.join(self.invoker_folder, "capif_subscriptions_id.json")
payload = {
"events": self.events_description,
"eventFilters": self.events_filter,
"eventReq": {}, # TO IMPROVE !!!
"notificationDestination": f"{self.capif_callback_url}",
"requestTestNotification": True,
"websockNotifConfig": {
"websocketUri": f"{self.capif_callback_url}",
"requestWebsocketUri": True
},
"supportedFeatures": f"{self.supported_features}"
}
if os.path.exists(path):
subscription = self._load_config_file(path)
if not isinstance(subscription, dict):
raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}")
if subscriberId in subscription and name in subscription[subscriberId]:
identifier = subscription[subscriberId][name]
# Attempt to delete the subscription from CAPIF
put_path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions/{identifier}"
try:
response = requests.put(
url=put_path,
json=payload,
headers={"Content-Type": "application/json"},
cert=(self.signed_key_crt_path, self.private_key_path),
verify=os.path.join(self.invoker_folder, "ca.crt")
)
response.raise_for_status()
self.logger.info(f"Successfully updated subscription for service '{name}'")
except Exception as e:
self.logger.error("Unexpected error: %s", e)
return None, {"error": f"Unexpected error: {e}"}
else:
self.logger.warning(f"Service '{name}' not found for subscriber '{subscriberId}'")
return None, {"error": f"Service '{name}' not found for subscriber '{subscriberId}'"}
else:
self.logger.error("Subscription file not found at path: %s", path)
return None, {"error": "Subscription file not found"}
def patch_subcription(self, name):
self.update_subcription(self, name)
class capif_provider_event_feature(capif_provider_connector):
def create_subscription(self, name, id):
subscriberId = id
path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions"
list_of_ids = self._load_provider_api_details()
number = self._find_key_by_value(list_of_ids, id)
payload = {
"events": self.events_description,
"eventFilters": self.events_filter,
"eventReq": {}, # TO IMPROVE !!!
"notificationDestination": f"{self.notification_destination}",
"requestTestNotification": True,
"websockNotifConfig": self.websock_notif_config,
"supportedFeatures": f"{self.supported_features}"
}
number_low = number.lower()
cert = (
os.path.join(self.provider_folder, f"{number_low}.crt"),
os.path.join(self.provider_folder, f"{number}_private_key.key"),
)
try:
response = requests.post(
url=path,
json=payload,
headers={"Content-Type": "application/json"},
cert=cert,
verify=os.path.join(self.provider_folder, "ca.crt")
)
response.raise_for_status()
location_header = response.headers.get("Location")
if location_header:
# Extrae el identificador de la URL en el encabezado 'Location'
identifier = location_header.rstrip('/').split('/')[-1]
self.logger.info(f"Subscriptionid obtained: {identifier}")
else:
self.logger.error("The Location header is not available in the response")
path = os.path.join(self.provider_folder, "capif_subscriptions_id.json")
# Load or initialize the subscription dictionary
# Load or initialize the subscription dictionary
if os.path.exists(path):
subscription = self._load_config_file(path)
if not isinstance(subscription, dict):
raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}")
else:
subscription = {}
if not isinstance(subscriberId, (str, int)):
raise TypeError(f"Expected 'subscriberId' to be a string or integer, but got {type(subscriberId).__name__}")
# Convert events_description to a string if it isn't already
if not isinstance(name, str):
name = str(name)
if str(subscriberId) not in subscription:
# If the subscriberId is not in the subscription, create an empty dictionary for it
subscription[str(subscriberId)] = {}
# Update the subscription structure
subscription[str(subscriberId)][name] = identifier
# Save the updated dictionary back to the file
self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w")
except Exception as e:
self.logger.error("Unexpected error: %s", e)
return None, {"error": f"Unexpected error: {e}"}
def delete_subscription(self, name, id):
subscriberId = id
path = os.path.join(self.provider_folder, "capif_subscriptions_id.json")
if os.path.exists(path):
subscription = self._load_config_file(path)
if not isinstance(subscription, dict):
raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}")
if subscriberId in subscription and name in subscription[subscriberId]:
identifier = subscription[subscriberId][name]
# Attempt to delete the subscription from CAPIF
delete_path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions/{identifier}"
list_of_ids = self._load_provider_api_details()
number = self._find_key_by_value(list_of_ids, id)
number_low = number.lower()
cert = (
os.path.join(self.provider_folder, f"{number_low}.crt"),
os.path.join(self.provider_folder, f"{number}_private_key.key"),
)
try:
response = requests.delete(
url=delete_path,
headers={"Content-Type": "application/json"},
cert=cert,
verify=os.path.join(self.provider_folder, "ca.crt")
)
response.raise_for_status()
# Remove the service entry from the subscription dictionary
del subscription[subscriberId][name]
# If no more services exist for the subscriber, remove the subscriber entry
if not subscription[subscriberId]:
del subscription[subscriberId]
# Save the updated dictionary back to the file
self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w")
self.logger.info(f"Successfully deleted subscription for service '{name}'")
except Exception as e:
self.logger.error("Unexpected error: %s", e)
return None, {"error": f"Unexpected error: {e}"}
else:
self.logger.warning(f"Service '{name}' not found for subscriber '{subscriberId}'")
return None, {"error": f"Service '{name}' not found for subscriber '{subscriberId}'"}
else:
self.logger.error("Subscription file not found at path: %s", path)
return None, {"error": "Subscription file not found"}
def update_subcription(self, name, id):
subscriberId = id
path = os.path.join(self.provider_folder, "capif_subscriptions_id.json")
list_of_ids = self._load_provider_api_details()
number = self._find_key_by_value(list_of_ids, id)
payload = {
"events": self.events_description,
"eventFilters": self.events_filter,
"eventReq": {}, # TO IMPROVE !!!
"notificationDestination": f"{self.notification_destination}",
"requestTestNotification": True,
"websockNotifConfig": self.websock_notif_config,
"supportedFeatures": f"{self.supported_features}"
}
if os.path.exists(path):
subscription = self._load_config_file(path)
if not isinstance(subscription, dict):
raise TypeError(f"Expected 'subscription' to be a dict, but got {type(subscription).__name__}")
if subscriberId in subscription and name in subscription[subscriberId]:
identifier = subscription[subscriberId][name]
# Attempt to delete the subscription from CAPIF
put_path = self.capif_https_url + f"capif-events/v1/{subscriberId}/subscriptions/{identifier}"
list_of_ids = self._load_provider_api_details()
number = self._find_key_by_value(list_of_ids, id)
number_low = number.lower()
cert = (
os.path.join(self.provider_folder, f"{number_low}.crt"),
os.path.join(self.provider_folder, f"{number}_private_key.key"),
)
try:
response = requests.put(
url=put_path,
json=payload,
headers={"Content-Type": "application/json"},
cert=cert,
verify=os.path.join(self.provider_folder, "ca.crt")
)
response.raise_for_status()
# Remove the service entry from the subscription dictionary
del subscription[subscriberId][name]
# If no more services exist for the subscriber, remove the subscriber entry
if not subscription[subscriberId]:
del subscription[subscriberId]
# Save the updated dictionary back to the file
self._create_or_update_file("capif_subscriptions_id", "json", subscription, "w")
self.logger.info(f"Successfully updated subscription for service '{name}'")
except Exception as e:
self.logger.error("Unexpected error: %s", e)
return None, {"error": f"Unexpected error: {e}"}
else:
self.logger.warning(f"Service '{name}' not found for subscriber '{subscriberId}'")
return None, {"error": f"Service '{name}' not found for subscriber '{subscriberId}'"}
else:
self.logger.error("Subscription file not found at path: %s", path)
return None, {"error": "Subscription file not found"}
def patch_subcription(self, name, id):
self.update_subcription(self, name, id)
\ No newline at end of file
......@@ -46,7 +46,7 @@ class capif_invoker_connector:
config_file = os.path.abspath(config_file)
# Load configuration from file if necessary
config = self.__load_config_file(config_file)
config = self._load_config_file(config_file)
debug_mode = os.getenv('DEBUG_MODE', config.get('debug_mode', 'False')).strip().lower()
if debug_mode == "false":
......@@ -99,6 +99,11 @@ class capif_invoker_connector:
csr_country_name = os.getenv('INVOKER_CSR_COUNTRY_NAME', csr_config.get('csr_country_name', '')).strip()
csr_email_address = os.getenv('INVOKER_CSR_EMAIL_ADDRESS', csr_config.get('csr_email_address', '')).strip()
# Events configuration
events_config = invoker_config.get('events', {})
self.events_description = os.getenv('INVOKER_EVENTS_DESCRIPTION', events_config.get('description', ''))
self.events_filter = os.getenv('INVOKER_EVENTS_FILTERS', events_config.get('eventFilters', ''))
# Define the invoker folder path and create it if it doesn't exist
self.invoker_folder = os.path.join(invoker_general_folder, capif_username)
os.makedirs(self.invoker_folder, exist_ok=True)
......@@ -137,10 +142,22 @@ class capif_invoker_connector:
)
if os.path.exists(path):
self.invoker_capif_details = self.__load_invoker_api_details()
self.signed_key_crt_path = os.path.join(
self.invoker_folder,
self.capif_username + ".crt"
)
self.private_key_path = os.path.join(
self.invoker_folder,
"private.key"
)
self.pathca = os.path.join(self.invoker_folder, "ca.crt")
self.logger.info("capif_invoker_connector initialized with the JSON parameters")
def __load_config_file(self, config_file: str):
def _load_config_file(self, config_file: str):
"""Loads the configuration file."""
try:
with open(config_file, 'r') as file:
......@@ -194,25 +211,11 @@ class capif_invoker_connector:
+ invoker_capif_details["api_invoker_id"]
)
signed_key_crt_path = os.path.join(
self.invoker_folder,
invoker_capif_details["user_name"] + ".crt"
)
private_key_path = os.path.join(
self.invoker_folder,
"private.key"
)
path = os.path.join(
self.invoker_folder,
"ca.crt"
)
response = requests.request(
"DELETE",
url,
cert=(signed_key_crt_path, private_key_path),
verify=path,
cert=(self.signed_key_crt_path, self.private_key_path),
verify=self.pathca,
)
response.raise_for_status()
self.logger.info("Invoker offboarded successfully")
......@@ -237,7 +240,6 @@ class capif_invoker_connector:
self.logger.info(
"Creating private and public keys for the Invoker cert")
try:
private_key_path = os.path.join(self.invoker_folder, "private.key")
csr_file_path = os.path.join(self.invoker_folder, "cert_req.csr")
......@@ -258,7 +260,7 @@ class capif_invoker_connector:
with open(csr_file_path, "wb+") as f:
f.write(dump_certificate_request(FILETYPE_PEM, req))
public_key = dump_certificate_request(FILETYPE_PEM, req)
with open(private_key_path, "wb+") as f:
with open(self.private_key_path, "wb+") as f:
f.write(dump_privatekey(FILETYPE_PEM, key))
self.logger.info("Keys created successfully")
......@@ -305,7 +307,7 @@ class capif_invoker_connector:
response.raise_for_status()
response_payload = json.loads(response.text)
ca_root_file_path = os.path.join(self.invoker_folder, "ca.crt")
ca_root_file_path = self.pathca
ca_root_file = open(ca_root_file_path, "wb+")
ca_root_file.write(bytes(response_payload["ca_root"], "utf-8"))
self.logger.info(
......@@ -339,13 +341,12 @@ class capif_invoker_connector:
"Authorization": "Bearer {}".format(capif_access_token),
"Content-Type": "application/json",
}
pathca = os.path.join(self.invoker_folder, "ca.crt")
response = requests.request(
"POST",
url,
headers=headers,
data=payload,
verify=pathca,
verify=self.pathca,
)
response.raise_for_status()
response_payload = json.loads(response.text)
......@@ -443,23 +444,14 @@ class capif_invoker_connector:
"Authorization": "Bearer {}".format(capif_access_token),
"Content-Type": "application/json",
}
signed_key_crt_path = os.path.join(
self.invoker_folder,
self.capif_username + ".crt"
)
private_key_path = os.path.join(
self.invoker_folder,
"private.key"
)
pathca = os.path.join(self.invoker_folder, "ca.crt")
response = requests.request(
"PUT",
url,
headers=headers,
data=payload,
cert=(signed_key_crt_path, private_key_path),
verify=pathca,
cert=(self.signed_key_crt_path, self.private_key_path),
verify=self.pathca,
)
response.raise_for_status()
......@@ -471,6 +463,50 @@ class capif_invoker_connector:
self.logger.error(
f"Error during updating Invoker to CAPIF: {e} - Response: {response.text}")
raise
def _create_or_update_file(self, file_name, file_type, content, mode="w"):
"""
Create or update a file with the specified content.
:param file_name: Name of the file (without extension).
:param file_type: File type or extension (e.g., "txt", "json", "html").
:param content: Content to write into the file. Can be a string, dictionary, or list.
:param mode: Write mode ('w' to overwrite, 'a' to append). Default is 'w'.
"""
# Validate the mode
if mode not in ["w", "a"]:
raise ValueError("Mode must be 'w' (overwrite) or 'a' (append).")
# Construct the full file name
full_file_name = f"{file_name}.{file_type}"
full_path = os.path.join(self.invoker_folder, full_file_name)
# Ensure the content is properly formatted
if isinstance(content, (dict, list)):
if file_type == "json":
try:
# Serialize content to JSON
content = json.dumps(content, indent=4)
except TypeError as e:
raise ValueError(f"Failed to serialize content to JSON: {e}")
else:
raise TypeError("Content must be a string when the file type is not JSON.")
elif not isinstance(content, str):
raise TypeError("Content must be a string, dictionary, or list.")
try:
# Open the file in the specified mode
with open(full_path, mode, encoding="utf-8") as file:
file.write(content)
# Log success based on the mode
if mode == "w":
self.logger.info(f"File '{full_file_name}' created or overwritten successfully.")
elif mode == "a":
self.logger.info(f"Content appended to file '{full_file_name}' successfully.")
except Exception as e:
self.logger.error(f"Error handling the file '{full_file_name}': {e}")
raise
import os
import logging
import urllib3
import requests
import json
import warnings
from requests.exceptions import RequestsDependencyWarning
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
warnings.filterwarnings("ignore", category=RequestsDependencyWarning)
# noqa: E501
# Basic configuration of the logger functionality
log_path = 'logs/sdk_logs.log'
log_dir = os.path.dirname(log_path)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
logging.basicConfig(
level=logging.NOTSET, # Minimum severity level to log
# Log message format
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_path), # Log to a file
logging.StreamHandler() # Also display in the console
]
)
class capif_logging_feature:
def __init__(self, config_file: str):
"""
Initializes the CAPIFProvider connector with the parameters specified in the configuration file.
"""
# Load configuration from file if necessary
config_file = os.path.abspath(config_file)
self.config_path = os.path.dirname(config_file)+"/"
config = self.__load_config_file(config_file)
debug_mode = os.getenv('DEBUG_MODE', config.get('debug_mode', 'False')).strip().lower()
if debug_mode == "false":
debug_mode = False
else:
debug_mode = True
# Initialize logger for this class
self.logger = logging.getLogger(self.__class__.__name__)
if debug_mode:
self.logger.setLevel(logging.DEBUG)
else:
self.logger.setLevel(logging.WARNING)
# Set logging level for urllib based on debug_mode
urllib_logger = logging.getLogger("urllib3")
if not debug_mode:
urllib_logger.setLevel(logging.WARNING)
else:
urllib_logger.setLevel(logging.DEBUG)
try:
# Retrieve provider configuration from JSON or environment variables
provider_config = config.get('provider', {})
provider_general_folder = os.path.abspath(
os.getenv('PROVIDER_FOLDER', provider_config.get('provider_folder', '')).strip())
capif_host = os.getenv('CAPIF_HOST', config.get('capif_host', '')).strip()
capif_register_host = os.getenv('REGISTER_HOST', config.get('register_host', '')).strip()
capif_https_port = str(os.getenv('CAPIF_HTTPS_PORT', config.get('capif_https_port', '')).strip())
capif_register_port = str(os.getenv('CAPIF_REGISTER_PORT', config.get('capif_register_port', '')).strip())
capif_provider_username = os.getenv('CAPIF_USERNAME', config.get('capif_username', '')).strip()
capif_provider_password = os.getenv('CAPIF_PASSWORD', config.get('capif_password', '')).strip()
# Get CSR (Certificate Signing Request) details from config or environment variables
cert_generation = provider_config.get('cert_generation', {})
csr_common_name = os.getenv('PROVIDER_CSR_COMMON_NAME', cert_generation.get('csr_common_name', '')).strip()
csr_organizational_unit = os.getenv('PROVIDER_CSR_ORGANIZATIONAL_UNIT', cert_generation.get('csr_organizational_unit', '')).strip()
csr_organization = os.getenv('PROVIDER_CSR_ORGANIZATION', cert_generation.get('csr_organization', '')).strip()
csr_locality = os.getenv('PROVIDER_CSR_LOCALITY', cert_generation.get('csr_locality', '')).strip()
csr_state_or_province_name = os.getenv('PROVIDER_CSR_STATE_OR_PROVINCE_NAME', cert_generation.get('csr_state_or_province_name', '')).strip()
csr_country_name = os.getenv('PROVIDER_CSR_COUNTRY_NAME', cert_generation.get('csr_country_name', '')).strip()
csr_email_address = os.getenv('PROVIDER_CSR_EMAIL_ADDRESS', cert_generation.get('csr_email_address', '')).strip()
# Retrieve provider specific values (APFs, AEFs)
supported_features = os.getenv('PROVIDER_SUPPORTED_FEATURES', provider_config.get('supported_features', '')).strip()
if not supported_features:
supported_features = "0"
apfs = os.getenv('PROVIDER_APFS', provider_config.get('apfs', '')).strip()
aefs = os.getenv('PROVIDER_AEFS', provider_config.get('aefs', '')).strip()
api_description_path = os.path.abspath(os.getenv('PROVIDER_API_DESCRIPTION_PATH', provider_config.get('api_description_path', '')).strip())
log = os.getenv('PROVIDER_LOG', provider_config.get('log', ''))
# Check required fields and log warnings/errors
if not capif_host:
self.logger.warning("CAPIF_HOST is not provided; defaulting to an empty string")
if not capif_provider_username:
self.logger.error("CAPIF_PROVIDER_USERNAME is required but not provided")
raise ValueError("CAPIF_PROVIDER_USERNAME is required")
# Setup the folder to store provider files (e.g., certificates)
self.provider_folder = os.path.join(provider_general_folder, capif_provider_username)
os.makedirs(self.provider_folder, exist_ok=True)
# Set attributes for provider credentials and configuration
self.capif_host = capif_host.strip()
self.capif_provider_username = capif_provider_username
self.capif_provider_password = capif_provider_password
self.capif_register_host = capif_register_host
self.capif_register_port = capif_register_port
self.csr_common_name = csr_common_name
self.csr_organizational_unit = csr_organizational_unit
self.csr_organization = csr_organization
self.csr_locality = csr_locality
self.csr_state_or_province_name = csr_state_or_province_name
self.csr_country_name = csr_country_name
self.csr_email_address = csr_email_address
self.supported_features = supported_features
self.aefs = int(aefs)
self.apfs = int(apfs)
# Get publish request details from config or environment variables
publish_req_config = provider_config.get('publish_req', {})
self.publish_req = {
"service_api_id": os.getenv('PUBLISH_REQ_SERVICE_API_ID', publish_req_config.get('service_api_id', '')).strip(),
"publisher_apf_id": os.getenv('PUBLISH_REQ_PUBLISHER_APF_ID', publish_req_config.get('publisher_apf_id', '')).strip(),
"publisher_aefs_ids": os.getenv('PUBLISH_REQ_PUBLISHER_AEFS_IDS', publish_req_config.get('publisher_aefs_ids', ''))
}
# Set the path for the API description file
self.api_description_path = api_description_path
# Set the CAPIF HTTPS port and construct CAPIF URLs
self.capif_https_port = str(capif_https_port)
self.provider_capif_ids = {}
path_prov_funcs = os.path.join(self.provider_folder, "provider_capif_ids.json")
if os.path.exists(path_prov_funcs):
self.provider_capif_ids = self.__load_provider_api_details()
path_published = os.path.join(self.provider_folder, "provider_service_ids.json")
if os.path.exists(path_published):
self.provider_service_ids = self.__load_config_file(path_published)
# Construct the CAPIF HTTPS URL
if len(self.capif_https_port) == 0 or int(self.capif_https_port) == 443:
self.capif_https_url = f"https://{capif_host.strip()}/"
else:
self.capif_https_url = f"https://{capif_host.strip()}:{self.capif_https_port.strip()}/"
# Construct the CAPIF register URL
if len(capif_register_port) == 0:
self.capif_register_url = f"https://{capif_register_host.strip()}:8084/"
else:
self.capif_register_url = f"https://{capif_register_host.strip()}:{capif_register_port.strip()}/"
self.__search_aef_and_api_by_name(log)
self.log = log
# Log initialization success message
self.logger.info("capif_logging_feature initialized with the capif_sdk_config.json parameters")
except Exception as e:
# Catch and log any exceptions that occur during initialization
self.logger.error(f"Error during initialization: {e}")
raise
def __search_aef_and_api_by_name(self, log):
"""
Searches for an AEF and API by name and updates self.api_id with the corresponding ID.
Args:
log (dict): A record containing API information, including "apiName".
Raises:
KeyError: If "apiName" is not present in the log.
ValueError: If no ID is associated with the given name.
"""
# Validate that the log contains the "apiName" field
if "apiName" not in log:
raise KeyError("The provided log does not contain 'apiName'.")
# Retrieve the API name
name = log["apiName"]
# Search for the corresponding API ID
self.api_id = self.provider_service_ids.get(name)
# Validate that a valid ID was found
if not self.api_id:
raise ValueError(f"No ID was found for the API '{name}'.")
def create_logs(self, aefId, api_invoker_id):
path = self.capif_https_url + f"/api-invocation-logs/v1/{aefId}/logs"
log_entry = {
"apiId": self.api_id,
"apiName": self.log["apiName"],
"apiVersion": self.log["apiVersion"],
"resourceName": self.log["resourceName"],
"uri": self.log["uri"],
"protocol": self.log["protocol"],
"operation": self.log["operation"],
"result": self.log["result"]
}
payload = {
"aefId": f"{aefId}",
"apiInvokerId": f"{api_invoker_id}",
"logs": [log_entry],
"supportedFeatures": f"{self.supported_features}"
}
provider_details = self.__load_provider_api_details()
AEF_api_prov_func_id = aefId
aef_number = None
for key, value in provider_details.items():
if value == AEF_api_prov_func_id and key.startswith("AEF-"):
aef_inter = key.split("-")[1]
# Obtain the aef number
aef_number = aef_inter.split("_")[0]
break
if aef_number is None:
self.logger.error(
f"No matching AEF found for publisher_aef_id: {AEF_api_prov_func_id}")
raise ValueError("Invalid publisher_aef_id")
cert = (
os.path.join(self.provider_folder, f"aef-{aef_number}.crt"),
os.path.join(self.provider_folder,
f"AEF-{aef_number}_private_key.key"),
)
try:
response = requests.post(
url=path,
json=payload,
headers={"Content-Type": "application/json"},
cert=cert,
verify=os.path.join(self.provider_folder, "ca.crt")
)
response.raise_for_status()
return response.status_code, response.json()
except Exception as e:
self.logger.error("Unexpected error: %s", e)
return None, {"error": f"Unexpected error: {e}"}
def __load_provider_api_details(self) -> dict:
"""
Loads NEF API details from the CAPIF provider details JSON file.
:return: A dictionary containing NEF API details.
:raises FileNotFoundError: If the CAPIF provider details file is not found.
:raises json.JSONDecodeError: If there is an error decoding the JSON file.
"""
file_path = os.path.join(self.provider_folder,
"provider_capif_ids.json")
try:
with open(file_path, "r") as file:
return json.load(file)
except FileNotFoundError:
self.logger.error(f"File not found: {file_path}")
raise
except json.JSONDecodeError as e:
self.logger.error(
f"Error decoding JSON from file {file_path}: {e}")
raise
except Exception as e:
self.logger.error(
f"Unexpected error while loading NEF API details: {e}")
raise
def __load_config_file(self, config_file: str):
"""Carga el archivo de configuración."""
try:
with open(config_file, 'r') as file:
return json.load(file)
except FileNotFoundError:
self.logger.warning(
f"Configuration file {config_file} not found. Using defaults or environment variables.")
return {}