Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ocf/capif
1 result
Show changes
Commits on Source (19)
Showing
with 433 additions and 111 deletions
......@@ -15,6 +15,17 @@ from ..core.sign_certificate import sign_certificate
from ..util import dict_to_camel_case, clean_empty, serialize_clean_camel_case
TOTAL_FEATURES = 2
SUPPORTED_FEATURES_HEX = "0"
def return_negotiated_supp_feat_dict(supp_feat):
final_supp_feat = bin(int(supp_feat, 16) & int(SUPPORTED_FEATURES_HEX, 16))[2:].zfill(TOTAL_FEATURES)[::-1]
return {
"PatchUpdate": True if final_supp_feat[0] == "1" else False,
"RNAA": True if final_supp_feat[1] == "1" else False,
"Final": hex(int(final_supp_feat[::-1], 2))[2:]
}
class ProviderManagementOperations(Resource):
def __check_api_provider_domain(self, api_prov_dom_id):
......@@ -49,6 +60,9 @@ class ProviderManagementOperations(Resource):
api_provider_enrolment_details.api_prov_dom_id = secrets.token_hex(
15)
negotiated_supported_features = return_negotiated_supp_feat_dict(api_provider_enrolment_details.supp_feat)
api_provider_enrolment_details.supp_feat = negotiated_supported_features["Final"]
current_app.logger.debug("Generating certs to api prov funcs")
......@@ -135,6 +149,9 @@ class ProviderManagementOperations(Resource):
if isinstance(result, Response):
return result
negotiated_supported_features = return_negotiated_supp_feat_dict(api_provider_enrolment_details.supp_feat)
api_provider_enrolment_details.supp_feat = negotiated_supported_features["Final"]
for func in api_provider_enrolment_details.api_prov_funcs:
if func.api_prov_func_id is None:
......
......@@ -11,6 +11,21 @@ from .responses import internal_server_error, not_found_error, make_response, ba
from ..util import serialize_clean_camel_case, clean_empty, dict_to_camel_case
TOTAL_FEATURES = 4
SUPPORTED_FEATURES_HEX = "c"
def return_negotiated_supp_feat_dict(supp_feat):
final_supp_feat = bin(int(supp_feat, 16) & int(SUPPORTED_FEATURES_HEX, 16))[2:].zfill(TOTAL_FEATURES)[::-1]
return {
"NotificationTestEvent": True if final_supp_feat[0] == "1" else False,
"NotificationWebsocket": True if final_supp_feat[1] == "1" else False,
"EnhancedEventReport": True if final_supp_feat[2] == "1" else False,
"ApiStatusMonitoring": True if final_supp_feat[3] == "1" else False,
"Final": hex(int(final_supp_feat[::-1], 2))[2:]
}
class EventSubscriptionsOperations(Resource):
def __check_subscriber_id(self, subscriber_id):
......@@ -32,7 +47,7 @@ class EventSubscriptionsOperations(Resource):
return not_found_error(detail="Invoker or APF or AEF or AMF Not found", cause="Subscriber Not Found")
return None
def __check_event_filters(self, events, filters):
current_app.logger.debug("Checking event filters.")
valid_filters = {
......@@ -86,10 +101,11 @@ class EventSubscriptionsOperations(Resource):
if isinstance(result, Response):
return result
negotiated_supported_features = return_negotiated_supp_feat_dict(event_subscription.supported_features)
# Check if EnhancedEventReport is enabled and validate event filters
if EventSubscription.return_supp_feat_dict(event_subscription.supported_features)["EnhancedEventReport"]:
if negotiated_supported_features["EnhancedEventReport"]:
if event_subscription.event_filters:
current_app.logger.debug(event_subscription.event_filters)
result = self.__check_event_filters(event_subscription.events, clean_empty(event_subscription.to_dict()["event_filters"]))
......@@ -109,6 +125,10 @@ class EventSubscriptionsOperations(Resource):
evnt = dict()
evnt["subscriber_id"] = subscriber_id
evnt["subscription_id"] = subscription_id
# Edit supported_features field to the negotiated one
event_subscription.supported_features = negotiated_supported_features["Final"]
evnt.update(event_subscription.to_dict())
mycol.insert_one(evnt)
......@@ -159,13 +179,13 @@ class EventSubscriptionsOperations(Resource):
exception= "An exception occurred in delete event"
current_app.logger.error(exception + "::" + str(e))
return internal_server_error(detail=exception, cause=str(e))
def put_event(self, event_subscription, subscriber_id, subscription_id):
try:
mycol = self.db.get_col_by_name(self.db.event_collection)
current_app.logger.debug("Updating event subscription")
if event_subscription.supported_features is None:
return bad_request_error(
detail="supportedFeatures not present in request",
......@@ -177,19 +197,32 @@ class EventSubscriptionsOperations(Resource):
if isinstance(result, Response):
return result
if EventSubscription.return_supp_feat_dict(event_subscription.supported_features)["EnhancedEventReport"] and event_subscription.event_filters:
result = self.__check_event_filters(event_subscription.events, clean_empty(event_subscription.to_dict()["event_filters"]))
if isinstance(result, Response):
return result
my_query = {'subscriber_id': subscriber_id,
'subscription_id': subscription_id}
'subscription_id': subscription_id}
eventdescription = mycol.find_one(my_query)
if eventdescription is None:
current_app.logger.error("Event subscription not found")
return not_found_error(detail="Event subscription not exist", cause="Event API subscription id not found")
return not_found_error(detail="Event subscription not exist",
cause="Event API subscription id not found")
negotiated_supported_features = return_negotiated_supp_feat_dict(event_subscription.supported_features)
if negotiated_supported_features["EnhancedEventReport"] and event_subscription.event_filters:
result = self.__check_event_filters(event_subscription.events, clean_empty(event_subscription.to_dict()["event_filters"]))
if isinstance(result, Response):
return result
elif (not negotiated_supported_features["EnhancedEventReport"]) and event_subscription.event_filters:
current_app.logger.error("Event filters provided but EnhancedEventReport is not enabled")
return bad_request_error(
detail="Bad Param",
cause="Event filters provided but EnhancedEventReport is not enabled",
invalid_params=[{"param": "eventFilters", "reason": "EnhancedEventReport is not enabled"}]
)
event_subscription.supported_features = negotiated_supported_features["Final"]
body = event_subscription.to_dict()
body["subscriber_id"] = subscriber_id
......@@ -202,11 +235,11 @@ class EventSubscriptionsOperations(Resource):
res = make_response(object=serialize_clean_camel_case(event_subscription), status=200)
return res
except Exception as e:
exception= "An exception occurred in updating event"
current_app.logger.error(exception + "::" + str(e))
return internal_server_error(detail=exception, cause=str(e))
return internal_server_error(detail=exception, cause=str(e))
def patch_event(self, event_subscription, subscriber_id, subscription_id):
......@@ -228,7 +261,9 @@ class EventSubscriptionsOperations(Resource):
current_app.logger.error("Event subscription not found")
return not_found_error(detail="Event subscription not exist", cause="Event API subscription id not found")
if EventSubscription.return_supp_feat_dict(eventdescription.get("supported_features"))["EnhancedEventReport"]:
negotiated_supported_features = return_negotiated_supp_feat_dict(eventdescription.get("supported_features"))
if negotiated_supported_features["EnhancedEventReport"]:
if event_subscription.events and event_subscription.event_filters:
result = self.__check_event_filters(event_subscription.events, clean_empty(event_subscription.to_dict()["event_filters"]))
elif event_subscription.events and event_subscription.event_filters is None and eventdescription.get("event_filters", None):
......@@ -239,6 +274,8 @@ class EventSubscriptionsOperations(Resource):
if isinstance(result, Response):
return result
event_subscription.supported_features = negotiated_supported_features["Final"]
body = clean_empty(event_subscription.to_dict())
document = mycol.update_one(my_query, {"$set":body})
document = mycol.find_one(my_query)
......@@ -247,8 +284,8 @@ class EventSubscriptionsOperations(Resource):
res = make_response(object=EventSubscription.from_dict(dict_to_camel_case(document)), status=200)
return res
except Exception as e:
exception= "An exception occurred in patching event"
current_app.logger.error(exception + "::" + str(e))
return internal_server_error(detail=exception, cause=str(e))
return internal_server_error(detail=exception, cause=str(e))
......@@ -13,6 +13,20 @@ from util import serialize_clean_camel_case
from .internal_event_ops import InternalEventOperations
TOTAL_FEATURES = 4
SUPPORTED_FEATURES_HEX = "c"
def return_negotiated_supp_feat_dict(supp_feat):
final_supp_feat = bin(int(supp_feat, 16) & int(SUPPORTED_FEATURES_HEX, 16))[2:].zfill(TOTAL_FEATURES)[::-1]
return {
"NotificationTestEvent": True if final_supp_feat[0] == "1" else False,
"NotificationWebsocket": True if final_supp_feat[1] == "1" else False,
"EnhancedEventReport": True if final_supp_feat[2] == "1" else False,
"ApiStatusMonitoring": True if final_supp_feat[3] == "1" else False,
"Final": hex(int(final_supp_feat[::-1], 2))[2:]
}
class Notifications():
......@@ -36,7 +50,7 @@ class Notifications():
data = EventNotification(sub["subscription_id"], events=event)
event_detail_redis=redis_event.get('event_detail', None)
if event_detail_redis is not None:
if EventSubscription.return_supp_feat_dict(sub["supported_features"])["EnhancedEventReport"]:
if return_negotiated_supp_feat_dict(sub["supported_features"])["EnhancedEventReport"]:
event_detail={}
current_app.logger.debug(f"event: {event_detail_redis}")
......@@ -54,13 +68,13 @@ class Notifications():
api_ids_list = event_filter.get("api_ids", None)
if api_ids_list and event_detail_redis.get('apiIds', None)[0] in api_ids_list:
event_detail["apiIds"]=event_detail_redis.get('apiIds', None)
if EventSubscription.return_supp_feat_dict(sub["supported_features"])["ApiStatusMonitoring"]:
if return_negotiated_supp_feat_dict(sub["supported_features"])["ApiStatusMonitoring"]:
event_detail["serviceAPIDescriptions"]=event_detail_redis.get('serviceAPIDescriptions', None)
else:
continue
else:
event_detail["apiIds"]=event_detail_redis.get('apiIds', None)
if EventSubscription.return_supp_feat_dict(sub["supported_features"])["ApiStatusMonitoring"]:
if return_negotiated_supp_feat_dict(sub["supported_features"])["ApiStatusMonitoring"]:
event_detail["serviceAPIDescriptions"]=event_detail_redis.get('serviceAPIDescriptions', None)
elif event in ["SERVICE_API_UPDATE"]:
if event_filter:
......
......@@ -67,20 +67,8 @@ class EventSubscription(Model):
self._request_test_notification = request_test_notification
self._websock_notif_config = websock_notif_config
self._supported_features = supported_features
@classmethod
def return_supp_feat_dict(cls, supp_feat):
TOTAL_FEATURES=4
supp_feat_in_hex = int(supp_feat, 16)
supp_feat_in_bin = bin(supp_feat_in_hex)[2:].zfill(TOTAL_FEATURES)[::-1]
return {
"NotificationTestEvent": True if supp_feat_in_bin[0] == "1" else False,
"NotificationWebsocket": True if supp_feat_in_bin[1] == "1" else False,
"EnhancedEventReport": True if supp_feat_in_bin[2] == "1" else False,
"ApiStatusMonitoring": True if supp_feat_in_bin[3] == "1" else False
}
@classmethod
def from_dict(cls, dikt) -> 'EventSubscription':
"""Returns the dict as a model
......
......@@ -105,12 +105,66 @@ class SecurityOperations(Resource):
current_app.logger.error("Not found security context")
return not_found_error(detail=security_context_not_found_detail, cause=api_invoker_no_context_cause)
if not authentication_info:
for security_info_obj in services_security_object['security_info']:
del security_info_obj['authentication_info']
if not authorization_info:
for security_info_obj in services_security_object['security_info']:
del security_info_obj['authorization_info']
for security_info_obj in services_security_object['security_info']:
if security_info_obj.get('sel_security_method') == "PKI":
current_app.logger.debug("PKI security method selected")
if authentication_info:
# Read the CA certificate from the file
with open("/usr/src/app/capif_security/ca.crt", "rb") as key_file:
key_data = key_file.read()
# Decode the certificate to a string
key_data = key_data.decode('utf-8')
# Add the CA certificate to the authentication_info
security_info_obj['authentication_info'] = key_data
else:
# If authentication_info is not needed, remove the key_data
del security_info_obj['authentication_info']
if authorization_info:
security_info_obj['authorization_info'] = security_info_obj.get('authorization_info', "")
else:
# If authorization_info is not needed, remove the key_data
del security_info_obj['authorization_info']
elif security_info_obj.get('sel_security_method') == "PSK":
current_app.logger.debug("PSK security method selected")
if authentication_info:
# Read the PSK from the file -> TODO
with open("/usr/src/app/capif_security/ca.crt", "rb") as key_file:
key_data = key_file.read()
# Decode the PSK to a string
key_data = key_data.decode('utf-8')
# Add the PSK to the authentication_info
security_info_obj['authentication_info'] = key_data
else:
# If authentication_info is not needed, remove the key_data
del security_info_obj['authentication_info']
if authorization_info:
security_info_obj['authorization_info'] = security_info_obj.get('authorization_info', "UNDER DEVELOPMENT")
else:
# If authorization_info is not needed, remove the key_data
del security_info_obj['authorization_info']
elif security_info_obj.get('sel_security_method') == "OAUTH":
current_app.logger.debug("OAUTH security method selected, this request is not needed")
if authentication_info:
security_info_obj['authentication_info'] = security_info_obj.get('authentication_info', "")
else:
# If authentication_info is not needed, remove the key_data
del security_info_obj['authentication_info']
if authorization_info:
security_info_obj['authorization_info'] = security_info_obj.get('authorization_info', "")
else:
# If authorization_info is not needed, remove the key_data
del security_info_obj['authorization_info']
else:
current_app.logger.error("Bad format security method")
return bad_request_error(detail="Bad format security method", cause="Bad format security method", invalid_params=[{"param": "securityMethod", "reason": "Bad format security method"}])
properyly_json = json.dumps(
services_security_object, default=json_util.default)
......@@ -176,6 +230,22 @@ class SecurityOperations(Resource):
return not_found_error(detail=f"Service with interfaceDescription {json.dumps(clean_empty(service_instance.interface_details.to_dict()))} not found", cause="Not found Service")
# We obtain the interface security methods
# We need to go deeper here, because the interface description is an array
# and we need to find the correct one according to preferred security method by invoker,
# maybe Published API contains more than one interface description, and each one is related
# with a different security method, then we need to get a complete list (interface and related security methods)
# amd then we need to check if the preferred security method is compatible with the interface description
# also the security methods inside interface description is not mandatory, in that case we use aefProfile.securityMethods
# an also that aefProfile.securityMethods is not mandatory, only in cases described on TS 29222 - 8.2.4.2.4 Type: AefProfile -
#
# NOTE4:
# For AEFs defined by 3GPP interacting with API invokers via CAPIF-2e, at least one of the "securityMethods" attribute
# within this data type or the "securityMethods" attribute within the "interfaceDescriptions" attribute shall be present.
# For AEFs defined by 3GPP interacting with API invokers via CAPIF-2, the "securityMethods" attribute is optional.
# For AEFs not defined by 3GPP, the "securityMethods" attribute is optional.
#
# To achieve this, we need to setup at config which domains or IPs are CAPIF-2e or CAPIF-2, and then we need to check if the domain or IP of the service is in the list.
security_methods = aef_profile["aef_profiles"][0]["interface_descriptions"][0]["security_methods"]
current_app.logger.debug("Interface security methods: " + str(security_methods))
......
......@@ -3,6 +3,9 @@
VAULT_ADDR="http://$VAULT_HOSTNAME:$VAULT_PORT"
VAULT_TOKEN=$VAULT_ACCESS_TOKEN
CERTS_FOLDER="/usr/src/app/capif_security"
# cd $CERTS_FOLDER
# Maximum number of retry attempts
MAX_RETRIES=30
# Delay between retries (in seconds)
......@@ -10,6 +13,40 @@ RETRY_DELAY=10
# Attempt counter
ATTEMPT=0
while [ $ATTEMPT -lt $MAX_RETRIES ]; do
# Increment ATTEMPT using eval
eval "ATTEMPT=\$((ATTEMPT + 1))"
echo "Attempt $ATTEMPT of $MAX_RETRIES"
# Make the request to Vault and store the response in a variable
RESPONSE=$(curl -s -k --connect-timeout 5 --max-time 10 \
--header "X-Vault-Token: $VAULT_TOKEN" \
--request GET "$VAULT_ADDR/v1/secret/data/ca" | jq -r '.data.data.ca')
echo "$RESPONSE"
# Check if the response is "null" or empty
if [ -n "$RESPONSE" ] && [ "$RESPONSE" != "null" ]; then
echo "$RESPONSE" > $CERTS_FOLDER/ca.crt
openssl verify -CAfile $CERTS_FOLDER/ca.crt $CERTS_FOLDER/ca.crt
echo "CA Root successfully saved."
SUCCES_OPERATION=true
break
else
echo "Invalid response ('null' or empty), retrying in $RETRY_DELAY seconds..."
sleep $RETRY_DELAY
fi
done
if [ "$SUCCES_OPERATION" = false ]; then
echo "Error: Failed to retrieve ca root a valid response after $MAX_RETRIES attempts."
exit 1 # Exit with failure
fi
# Setup inital value to ATTEMPT and SUCCESS_OPERATION
ATTEMPT=0
SUCCES_OPERATION=false
while [ $ATTEMPT -lt $MAX_RETRIES ]; do
# Increment ATTEMPT using eval
eval "ATTEMPT=\$((ATTEMPT + 1))"
......@@ -24,16 +61,20 @@ while [ $ATTEMPT -lt $MAX_RETRIES ]; do
# Check if the response is "null" or empty
if [ -n "$RESPONSE" ] && [ "$RESPONSE" != "null" ]; then
echo "$RESPONSE" > /usr/src/app/capif_security/server.key
echo "$RESPONSE" > $CERTS_FOLDER/server.key
echo "Public key successfully saved."
gunicorn -k uvicorn.workers.UvicornH11Worker --bind 0.0.0.0:8080 \
--chdir /usr/src/app/capif_security wsgi:app
exit 0 # Exit successfully
SUCCES_OPERATION=true
break
else
echo "Invalid response ('null' or empty), retrying in $RETRY_DELAY seconds..."
sleep $RETRY_DELAY
fi
done
echo "Error: Failed to retrieve a valid response after $MAX_RETRIES attempts."
exit 1 # Exit with failure
if [ "$SUCCES_OPERATION" = false ]; then
echo "Error: Failed to retrieve server key valid response after $MAX_RETRIES attempts."
exit 1 # Exit with failure
fi
gunicorn -k uvicorn.workers.UvicornH11Worker --bind 0.0.0.0:8080 \
--chdir $CERTS_FOLDER wsgi:app
......@@ -4,7 +4,7 @@ source $(dirname "$(readlink -f "$0")")/variables.sh
help() {
echo "Usage: $1 <options>"
echo " -c : Setup different hostname for capif"
echo " -s : Run Mock server"
echo " -s : Run Mock server. Default true"
echo " -m : Run monitoring service"
echo " -l : Set Log Level (default DEBUG). Select one of: [CRITICAL, FATAL, ERROR, WARNING, WARN, INFO, DEBUG, NOTSET]"
echo " -r : Remove cached information on build"
......@@ -35,7 +35,7 @@ then
fi
# Read params
while getopts ":c:l:mshrv:f:g:b:" opt; do
while getopts ":c:l:ms:hrv:f:g:b:" opt; do
case $opt in
c)
CAPIF_HOSTNAME="$OPTARG"
......@@ -44,7 +44,7 @@ while getopts ":c:l:mshrv:f:g:b:" opt; do
MONITORING_STATE=true
;;
s)
ROBOT_MOCK_SERVER=true
ROBOT_MOCK_SERVER="$OPTARG"
;;
v)
OCF_VERSION="$OPTARG"
......
......@@ -32,6 +32,7 @@ export LOG_LEVEL=DEBUG
export CACHED_INFO=""
export BUILD_DOCKER_IMAGES=true
export REMOVE_IMAGES=false
export ROBOT_MOCK_SERVER=true
# Needed to avoid write permissions on bind volumes with prometheus and grafana
export DUID=$(id -u)
......
......@@ -1252,3 +1252,68 @@ Retrieve access token with invalid apiName at scope
Check Response Variable Type And Values ${resp} 400 AccessTokenErr
... error=invalid_scope
... error_description=One of the api names does not exist or is not associated with the aef id provided
Retrieve the Security Context of an API Invoker for PKI security method
[Tags] capif_security_api-28 smoke
# Default Invoker Registration and Onboarding
${register_user_info_invoker} ${url} ${request_body}= Invoker Default Onboarding
# Register Provider
${register_user_info_provider}= Provider Default Registration
# Publish Service API
# Create list with security methods
${security_methods}= Create List PKI
${service_api_description_published_1} ${resource_url} ${request_body}= Publish Service Api
... ${register_user_info_provider}
... service_1
... security_methods=${security_methods}
# Store apiId1
${service_api_id_1}= Set Variable ${service_api_description_published_1['apiId']}
# Create Security Context
${request_body}= Create Service Security Default Body
... ${NOTIFICATION_DESTINATION_URL}
... aef_id=${register_user_info_provider['aef_id']}
... api_id=${service_api_id_1}
${resp}= Put Request Capif
... /capif-security/v1/trustedInvokers/${register_user_info_invoker['api_invoker_id']}
... json=${request_body}
... server=${CAPIF_HTTPS_URL}
... verify=ca.crt
... username=${INVOKER_USERNAME}
Check Response Variable Type And Values ${resp} 201 ServiceSecurity
${resource_url}= Check Location Header ${resp} ${LOCATION_SECURITY_RESOURCE_REGEX}
${service_security_context}= Set Variable ${resp.json()}
# Retrieve Security context can setup by parameters if authenticationInfo and authorizationInfo are needed at response.
${resp}= Get Request Capif
... /capif-security/v1/trustedInvokers/${register_user_info_invoker['api_invoker_id']}?authenticationInfo=true&authorizationInfo=true
... server=${CAPIF_HTTPS_URL}
... verify=ca.crt
... username=${AEF_PROVIDER_USERNAME}
# Check Results
Check Response Variable Type And Values ${resp} 200 ServiceSecurity
# Response must accomplish:
# aefProfile must contain authenticationInfo with CA root certificate
# aefProfile must NOT CONTAIN authorizationInfo
# Create security_info to compare with response
## Read CA root certificate
${ca_root}= Read File Utf8 ca.crt
## Create a securityInfo with authenticationInfo with CA root certificate
${security_info_expected}= Add Key To Object ${service_security_context['securityInfo'][0]} authenticationInfo ${ca_root}
## Create List of securityInfo
${security_info_list}= Create List ${security_info_expected}
## Set expected securityInfo list in service_security_context
${service_security_context_filtered}= Add Key To Object ${service_security_context} securityInfo ${security_info_list}
Log Dictionary ${service_security_context_filtered}
# Check Results
Dictionaries Should Be Equal ${resp.json()} ${service_security_context_filtered}
......@@ -3,7 +3,10 @@ def create_service_api_description(api_name="service_1",
supported_features="0",
vendor_specific_service_api_description=None,
vendor_specific_aef_profile=None,
api_status=None):
api_status=None,
security_methods="default",
domain_name=None,
interface_descriptions=None):
aef_ids = list()
if isinstance(aef_id, list):
aef_ids = aef_id
......@@ -12,7 +15,33 @@ def create_service_api_description(api_name="service_1",
print("aef_id parameter is a string")
aef_ids.append(aef_id)
profiles = create_aef_profiles(aef_ids)
security_methods_normalized = list()
if security_methods is not None:
if isinstance(security_methods, list):
print("security_methods parameter is a list")
if len(security_methods) > 0:
if isinstance(security_methods[0], list):
security_methods_normalized = security_methods
else:
security_methods_normalized.append(security_methods)
elif isinstance(security_methods, str):
print("security_methods parameter is a string")
if security_methods == "default":
for idx in range(len(aef_ids)):
security_methods_normalized.append(["OAUTH"])
else:
security_methods_normalized.append([security_methods])
else:
print(f"security_methods is {security_methods}")
print(f"security_methods_normalized: {security_methods_normalized}")
else:
print("security_methods parameter is None")
profiles = create_aef_profiles(
aef_ids,
security_methods_normalized,
domain_name,
interface_descriptions)
body = {
"apiName": api_name,
......@@ -58,16 +87,32 @@ def create_service_api_description(api_name="service_1",
return body
def create_aef_profiles(aef_ids):
def create_aef_profiles(
aef_ids,
security_methods,
domain_name=None,
interface_descriptions=None):
profiles = list()
index = 1
for aef_id in aef_ids:
profiles.append(create_aef_profile(aef_id, "resource_" + str(index)))
security_method = get_value(security_methods, index-1)
print(f"aef_id: {aef_id}, security_method: {security_method}")
profiles.append(
create_aef_profile(
aef_id,
"resource_" + str(index),
security_method,
domain_name,
interface_descriptions))
index = index+1
return profiles
def create_aef_profile(aef_id, resource_name):
def create_aef_profile(aef_id,
resource_name,
security_method=None,
domain_name=None,
interface_descriptions=None):
data = {
"aefId": aef_id,
"versions": [
......@@ -90,15 +135,23 @@ def create_aef_profile(aef_id, resource_name):
],
"protocol": "HTTP_1_1",
"dataFormat": "JSON",
"securityMethods": ["PSK"],
"interfaceDescriptions": [
{
"ipv4Addr": "string",
"port": 65535,
"securityMethods": ["PSK"]
}
]
}
if domain_name is not None:
data['domainName'] = domain_name
elif interface_descriptions is not None:
data['interfaceDescriptions'] = interface_descriptions
elif domain_name is None and interface_descriptions is None:
data['interfaceDescriptions'] = [
create_interface_description(
ipv4_addr="string",
port=65535,
security_methods=security_method
)
]
if security_method is not None:
data['securityMethods'] = security_method
return data
......@@ -109,7 +162,10 @@ def create_service_api_description_patch(aef_id=None,
service_api_category=None,
api_supp_feats=None,
pub_api_path=None,
ccf_id=None):
ccf_id=None,
security_methods=None,
domain_name=None,
interface_descriptions=None):
body = dict()
# aef profiles
......@@ -122,10 +178,28 @@ def create_service_api_description_patch(aef_id=None,
elif isinstance(aef_id, str):
print("aef_id parameter is a string")
aef_ids.append(aef_id)
security_methods_normalized = list()
if security_methods is not None:
if isinstance(security_methods, list):
print("security_methods parameter is a list")
if len(security_methods) > 0:
if isinstance(security_methods[0], list):
security_methods_normalized = security_methods
else:
security_methods_normalized.append(security_methods)
elif isinstance(security_methods, str):
print("security_methods parameter is a string")
security_methods_normalized.append([security_methods])
if aef_ids is not None:
profiles = create_aef_profiles(aef_ids)
profiles = create_aef_profiles(
aef_ids,
security_methods_normalized,
domain_name,
interface_descriptions)
body['aefProfiles'] = profiles
# description
if description is not None:
body['description'] = description
......@@ -163,3 +237,38 @@ def create_service_api_description_patch(aef_id=None,
body['apiStatus']['aefIds'] = aef_ids_active
return body
def get_value(lst, index):
return lst[index] if index < len(lst) else None
def create_interface_description(ipv4_addr=None,
ipv6_addr=None,
fqdn=None,
port=None,
api_prefix=None,
security_methods=None):
"""
Create an interface description with the given parameters.
"""
# Create the interface description dictionary
data = dict()
if ipv4_addr is not None:
data['ipv4Addr'] = ipv4_addr
elif ipv6_addr is not None:
data['ipv6Addr'] = ipv6_addr
elif fqdn is not None:
data['fqdn'] = fqdn
else:
raise ValueError(
"At least one of ipv4_addr, ipv6_addr, or fqdn must be provided.")
if port is not None:
data['port'] = port
if api_prefix is not None:
data['apiPrefix'] = api_prefix
if security_methods is not None:
data['securityMethods'] = security_methods
# Return the interface description
return data
......@@ -50,6 +50,12 @@ def store_in_file(file_path, data):
f.write(bytes(data, 'utf-8'))
f.close()
def read_file_utf8(file_path: str) -> str:
try:
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
except Exception as error:
raise Exception(f"Error al leer el archivo: {error}")
def cert_tuple(cert_file, key_file):
return (cert_file, key_file)
......@@ -134,6 +140,12 @@ def remove_key_from_object(input, key_to_remove):
return input_copy
def add_key_to_object(input, key_to_add, value_to_add):
input_copy = copy.deepcopy(input)
input_copy[key_to_add] = value_to_add
return input_copy
def create_scope(aef_id, api_name):
data = "3gpp#" + aef_id + ":" + api_name
......
......@@ -7,7 +7,7 @@ def create_service_security_default_body(
authentication_info=None,
authorization_info=None,
grant_type=None,
pref_security_methods=["PSK", "PKI", "OAUTH"],
pref_security_methods=["OAUTH", "PKI", "PSK"],
sel_security_method=None,
request_websocket_uri=None,
websocket_uri=None):
......@@ -32,51 +32,6 @@ def create_service_security_default_body(
return data
def create_service_security_body(notification_destination,
supported_features,
security_info=None,
aef_id=None,
api_id=None,
authentication_info=None,
authorization_info=None):
data = {
"notificationDestination": notification_destination,
"supportedFeatures": supported_features,
"securityInfo": [{
"authenticationInfo": "authenticationInfo",
"authorizationInfo": "authorizationInfo",
"interfaceDetails": {
"ipv4Addr": "127.0.0.1",
"securityMethods": ["PSK"],
"port": 5248
},
"prefSecurityMethods": ["PSK", "PKI", "OAUTH"],
}
],
"websockNotifConfig": {
"requestWebsocketUri": True,
"websocketUri": "websocketUri"
},
"requestTestNotification": True
}
if aef_id is not None and api_id is not None:
security_info = dict()
if authentication_info is not None:
security_info['authenticationInfo'] = authentication_info
if authorization_info is not None:
security_info['authorizationInfo'] = authorization_info
data['securityInfo'].append({
"authenticationInfo": "authenticationInfo",
"authorizationInfo": "authorizationInfo",
"prefSecurityMethods": ["PSK", "PKI", "OAUTH"],
"aefId": aef_id,
"apiId": api_id
})
return data
def create_security_info(
aef_id=None,
interface_details=None,
......
......@@ -796,6 +796,9 @@ Publish Service Api Request
... ${vendor_specific_aef_profile}=${None}
... ${aef_id}=${NONE}
... ${api_status}=${NONE}
... ${security_methods}=default
... ${domain_name}=${NONE}
... ${interface_descriptions}=${NONE}
${aef_ids}= Create List
IF "${aef_id}" == "${NONE}"
......@@ -826,6 +829,10 @@ Publish Service Api Request
... ${vendor_specific_service_api_description}
... ${vendor_specific_aef_profile}
... ${api_status}
... ${security_methods}
... ${domain_name}
... ${interface_descriptions}
${resp}= Post Request Capif
... /published-apis/v1/${apf_id_to_use}/service-apis
... json=${request_body}
......@@ -846,6 +853,9 @@ Publish Service Api
... ${vendor_specific_aef_profile}=${None}
... ${aef_id}=${NONE}
... ${api_status}=${NONE}
... ${security_methods}=default
... ${domain_name}=${NONE}
... ${interface_descriptions}=${NONE}
${resp} ${request_body}= Publish Service Api Request
... ${register_user_info_provider}
......@@ -857,6 +867,9 @@ Publish Service Api
... ${vendor_specific_aef_profile}
... ${aef_id}
... ${api_status}
... ${security_methods}
... ${domain_name}
... ${interface_descriptions}
Check Response Variable Type And Values ${resp} 201 ServiceAPIDescription
Dictionary Should Contain Key ${resp.json()} apiId
......