Newer
Older
os.remove(path)
break
output_path = os.path.join(
self.provider_folder, f"capif_{file_name}_{id}_api.json")
with open(output_path, "w") as outfile:
outfile.write(capif_response_text)
self.logger.info(f"CAPIF response saved to {output_path}")
output_path = os.path.join(
self.provider_folder, "provider_service_ids.json")
if os.path.exists(output_path):
with open(output_path, "r") as outfile:
provider_service_ids = json.load(outfile)
value in provider_service_ids.items() if value == id]
provider_service_ids[file_name] = id
self.provider_service_ids = provider_service_ids
json.dump(provider_service_ids, outfile, indent=4)
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
self.logger.info(
f"API '{file_name}' with ID '{id}' added to Published Apis.")
return json.loads(capif_response_text)
except requests.RequestException as e:
self.logger.error(
f"Request to CAPIF failed: {e} - Response: {response.text}")
raise
except Exception as e:
self.logger.error(
f"Unexpected error during service publication: {e} - Response: {response.text}")
raise
def offboard_provider(self) -> None:
"""
Offboards and deregisters the NEF (Network Exposure Function).
"""
try:
self.offboard_nef()
self.__remove_files()
self.logger.info(
"Provider offboarded and deregistered successfully.")
except Exception as e:
self.logger.error(
f"Failed to offboard and deregister Provider: {e}")
raise
def offboard_nef(self) -> None:
"""
Offboards the NEF (Network Exposure Function) from CAPIF.
"""
try:
self.logger.info("Offboarding the provider")
# Load CAPIF API details
capif_api_details = self._load_provider_api_details()
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
url = f"{self.capif_https_url}api-provider-management/v1/registrations/{capif_api_details['capif_registration_id']}"
# Define certificate paths
cert_paths = (
os.path.join(self.provider_folder, "amf.crt"),
os.path.join(self.provider_folder, "AMF_private_key.key")
)
# Send DELETE request to offboard the provider
response = requests.delete(
url,
cert=cert_paths,
verify=os.path.join(self.provider_folder, "ca.crt")
)
response.raise_for_status()
self.logger.info("Offboarding performed successfully")
except requests.exceptions.RequestException as e:
self.logger.error(
f"Error offboarding Provider: {e} - Response: {response.text}")
raise
except Exception as e:
self.logger.error(
f"Unexpected error: {e} - Response: {response.text}")
raise
def __remove_files(self):
self.logger.info("Removing files generated")
try:
folder_path = self.provider_folder
if os.path.exists(folder_path):
# Deletes all content within the folder, including files and subfolders
for root, dirs, files in os.walk(folder_path):
for file in files:
os.remove(os.path.join(root, file))
for dir in dirs:
shutil.rmtree(os.path.join(root, dir))
os.rmdir(folder_path)
self.logger.info(
f"All contents in {folder_path} removed successfully.")
else:
self.logger.warning(f"Folder {folder_path} does not exist.")
except Exception as e:
self.logger.error(f"Error during removing folder contents: {e}")
raise
"""
Loads NEF API details from the CAPIF provider details JSON file.
:return: A dictionary containing NEF API details.
:raises FileNotFoundError: If the CAPIF provider details file is not found.
:raises json.JSONDecodeError: If there is an error decoding the JSON file.
"""
file_path = os.path.join(self.provider_folder,
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
try:
with open(file_path, "r") as file:
return json.load(file)
except FileNotFoundError:
self.logger.error(f"File not found: {file_path}")
raise
except json.JSONDecodeError as e:
self.logger.error(
f"Error decoding JSON from file {file_path}: {e}")
raise
except Exception as e:
self.logger.error(
f"Unexpected error while loading NEF API details: {e}")
raise
def update_provider(self):
self.certs_modifications()
capif_postauth_info = self.__save_capif_ca_root_file_and_get_auth_token()
capif_onboarding_url = capif_postauth_info["ccf_api_onboarding_url"]
access_token = capif_postauth_info["access_token"]
ccf_publish_url = capif_postauth_info["ccf_publish_url"]
onboarding_response = self.update_onboard(
capif_onboarding_url, access_token)
capif_registration_id = onboarding_response["apiProvDomId"]
self.__write_to_file(
onboarding_response, capif_registration_id, ccf_publish_url
)
def certs_modifications(self):
self.logger.info("Starting certificate removal process...")
# List of possible certificate patterns to remove
cert_extensions = ["_private_key.key", "_public.csr", ".crt"]
# Iterate over the directory and remove matching files
for file_name in os.listdir(self.provider_folder):
if any(file_name.startswith(pattern) for pattern in cert_patterns) and any(file_name.endswith(ext) for ext in cert_extensions):
file_path = os.path.join(self.provider_folder, file_name)
try:
os.remove(file_path)
self.logger.info(f"Removed certificate file: {file_name}")
except Exception as e:
self.logger.error(f"Error removing {file_name}: {e}")
self.logger.info("Certificate removal process completed.")
def update_onboard(self, capif_onboarding_url, access_token):
self.logger.info(
"Onboarding Provider to CAPIF and waiting signed certificate by giving our public keys to CAPIF")
capif_id = "/" + api_details["capif_registration_id"]
url = f"{self.capif_https_url}{capif_onboarding_url}{capif_id}"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
roles = ["AMF"]
for n in range(1, self.aefs + 1):
roles.append("AEF")
for n in range(1, self.apfs + 1):
roles.append("APF")
{"regInfo": {"apiProvPubKey": ""}, "apiProvFuncRole": role,
"apiProvFuncInfo": f"{role.lower()}"}
for role in roles
],
"apiProvDomInfo": "This is provider",
"failReason": "string",
"regSec": access_token,
}
# Generate the indexed roles for certificate creation
indexed_roles = ["AMF"] + [f"AEF-{n}" for n in range(1, self.aefs + 1)] + [
f"APF-{n}" for n in range(1, self.apfs + 1)]
for i, api_func in enumerate(payload["apiProvFuncs"]):
found_key = False # Variable to control if a public key has already been found
for root, dirs, files in os.walk(folder_path):
for file_name in files:
if file_name.endswith(".csr"):
# Check if the file starts with the expected role
role_prefix = indexed_roles[i]
if any(file_name.startswith(prefix) and role_prefix == prefix for prefix in [f"APF-{i+1}", f"AEF-{i+1}", "AMF"]):
file_path = os.path.join(root, file_name)
with open(file_path, "r") as csr_file:
api_func["regInfo"]["apiProvPubKey"] = csr_file.read(
)
found_key = True
break
if found_key:
break
# If a file with the public key is not found, generate a new key
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
if not found_key:
public_key = self.__create_private_and_public_keys(
indexed_roles[i])
api_func["regInfo"]["apiProvPubKey"] = public_key.decode(
"utf-8")
cert = (
os.path.join(self.provider_folder, "amf.crt"),
os.path.join(self.provider_folder, "AMF_private_key.key"),
)
try:
response = requests.put(
url,
headers=headers,
data=json.dumps(payload),
cert=cert,
verify=os.path.join(self.provider_folder, "ca.crt"),
)
response.raise_for_status()
self.logger.info(
"Provider onboarded and signed certificate obtained successfully")
return response.json()
except requests.exceptions.RequestException as e:
self.logger.error(
f"Onboarding failed: {e} - Response: {response.text}")
raise
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
def _create_or_update_file(self, file_name, file_type, content, mode="w"):
"""
Create or update a file with the specified content.
:param file_name: Name of the file (without extension).
:param file_type: File type or extension (e.g., "txt", "json", "html").
:param content: Content to write into the file. Can be a string, dictionary, or list.
:param mode: Write mode ('w' to overwrite, 'a' to append). Default is 'w'.
"""
# Validate the mode
if mode not in ["w", "a"]:
raise ValueError("Mode must be 'w' (overwrite) or 'a' (append).")
# Construct the full file name
full_file_name = f"{file_name}.{file_type}"
full_path = os.path.join(self.provider_folder, full_file_name)
# Ensure the content is properly formatted
if isinstance(content, (dict, list)):
if file_type == "json":
try:
# Serialize content to JSON
content = json.dumps(content, indent=4)
except TypeError as e:
raise ValueError(f"Failed to serialize content to JSON: {e}")
else:
raise TypeError("Content must be a string when the file type is not JSON.")
elif not isinstance(content, str):
raise TypeError("Content must be a string, dictionary, or list.")
try:
# Open the file in the specified mode
with open(full_path, mode, encoding="utf-8") as file:
file.write(content)
# Log success based on the mode
if mode == "w":
self.logger.info(f"File '{full_file_name}' created or overwritten successfully.")
elif mode == "a":
self.logger.info(f"Content appended to file '{full_file_name}' successfully.")
except Exception as e:
self.logger.error(f"Error handling the file '{full_file_name}': {e}")
raise
def _find_key_by_value(self, data, target_value):
"""
Given a dictionary and a value, return the key corresponding to that value.
:param data: Dictionary to search.
:param target_value: Value to find the corresponding key for.
:return: Key corresponding to the target value, or None if not found.
"""
for key, value in data.items():
if value == target_value:
return key
return None
def _load_config_file(self, config_file: str):
"""Loads the configuration file."""
try:
with open(config_file, 'r') as file:
return json.load(file)
except FileNotFoundError:
self.logger.warning(
f"Configuration file {config_file} not found. Using defaults or environment variables.")