diff --git a/Functionalities/Config_files/config.json b/Functionalities/Config_files/config.json new file mode 100644 index 0000000000000000000000000000000000000000..76938b804a8f5c13d65e0e22c324c421d77d1f69 --- /dev/null +++ b/Functionalities/Config_files/config.json @@ -0,0 +1,40 @@ +{ + +"invoker_folder": "", + +"provider_folder": "", + +"capif_host": "", + +"register_host": "", + +"capif_https_port": "", + +"capif_register_port": "", + +"capif_callback_url": "", + +"csr_common_name": "", + +"csr_organizational_unit": "", + +"csr_organization": "", + +"crs_locality": "", + +"csr_state_or_province_name": "", + +"csr_country_name": "", + +"csr_email_address": "", + +"capif_username": "", + +"capif_password": "", + +"APFs":"", + +"AEFs":"", + +"debug_mode": "" +} diff --git a/Functionalities/Config_files/discover_filter.json b/Functionalities/Config_files/discover_filter.json new file mode 100644 index 0000000000000000000000000000000000000000..48854aa05caa3204706be45b33841344b787fb38 --- /dev/null +++ b/Functionalities/Config_files/discover_filter.json @@ -0,0 +1,15 @@ +{ + "api-name": "", + "api-version": "", + "comm-type": "", + "protocol": "", + "aef-id": "", + "data-format": "", + "api-cat": "", + "preferred-aef-loc": "", + "req-api-prov-name": "", + "supported-features": "", + "api-supported-features": "", + "ue-ip-addr": "", + "service-kpis": "" +} diff --git a/Functionalities/Config_files/publish.json b/Functionalities/Config_files/publish.json new file mode 100644 index 0000000000000000000000000000000000000000..f7cd40433ae554bc3fb5a354c97f498f53b82d7a --- /dev/null +++ b/Functionalities/Config_files/publish.json @@ -0,0 +1,5 @@ +{ + "serviceApiId":"", + "publisherAPFid":"", + "publisherAEFsids":["","",""] +} \ No newline at end of file diff --git a/Functionalities/Config_files/register.json b/Functionalities/Config_files/register.json new file mode 100644 index 0000000000000000000000000000000000000000..25367f06a80eef832e2d21d3bab8f57e9d25407f --- /dev/null +++ b/Functionalities/Config_files/register.json @@ -0,0 +1,10 @@ +{ + "register_host": "", + "capif_register_port": "", + "capif_register_username": "", + "capif_register_password": "", + "capif_username":"", + "capif_password":"", + "config_path":"", + "uuid":"" +} \ No newline at end of file diff --git a/Functionalities/capif_exposer_sample_files/provider_api_description_sample.json b/Functionalities/capif_exposer_sample_files/provider_api_description_sample.json new file mode 100755 index 0000000000000000000000000000000000000000..170432025898873254062c6c2a53e05c7ef8152f --- /dev/null +++ b/Functionalities/capif_exposer_sample_files/provider_api_description_sample.json @@ -0,0 +1,155 @@ +{ + "apiName": "Api-de-prueba-2", + "aefProfiles": [ + { + "aefId": "", + "versions": [ + { + "apiVersion": "v1", + "expiry": "2100-11-30T10:32:02.004Z", + "resources": [ + { + "resourceName": "MONITORING_SUBSCRIPTIONS", + "commType": " SUBSCRIBE_NOTIFY", + "uri": "/{scsAsId}/subscriptions", + "custOpName": "http_post", + "operations": [ + "GET", + "POST" + ], + "description": "Endpoint to manage monitoring subscriptions" + }, + { + "resourceName": "MONITORING_SUBSCRIPTION_SINGLE", + "commType": " SUBSCRIBE_NOTIFY", + "uri": "/{scsAsId}/subscriptions/{subscriptionId}", + "custOpName": "http_get", + "operations": [ + "GET", + "PUT", + "DELETE" + ], + "description": "Endpoint to manage single subscription" + } + ], + "custOperations": [ + { + "commType": "REQUEST_RESPONSE", + "custOpName": "string", + "operations": [ + "POST" + ], + "description": "string" + } + ] + } + ], + "protocol": "HTTP_1_1", + "dataFormat": "JSON", + "securityMethods": [ + "Oauth", + "PSK" + ], + "interfaceDescriptions": [ + { + "ipv4Addr": "127.0.0.1", + "port": 8888, + "securityMethods": [ + "Oauth" + ] + } + ] + }, + { + "aefId": "", + "versions": [ + { + "apiVersion": "v1", + "expiry": "2100-11-30T10:32:02.004Z", + "resources": [ + { + "resourceName": "TSN_LIST_PROFILES", + "commType": " SUBSCRIBE_NOTIFY", + "uri": "/profile", + "custOpName": "http_get", + "operations": [ + "GET" + ], + "description": "Endpoint for retrieving the list of available TSN profiles" + }, + { + "resourceName": "TSN_DETAIL_PROFILE", + "commType": " SUBSCRIBE_NOTIFY", + "uri": "/profile?name={profileName}", + "custOpName": "http_get", + "operations": [ + "GET" + ], + "description": "Endpoint for retrieving information about a single TSN profile" + }, + { + "resourceName": "TSN_APPLY_CONFIGURATION", + "commType": " SUBSCRIBE_NOTIFY", + "uri": "/apply", + "custOpName": "http_post", + "operations": [ + "POST" + ], + "description": "Endpoint for configuring TSN connection parameters" + }, + { + "resourceName": "TSN_CLEAR_CONFIGURATION", + "commType": " SUBSCRIBE_NOTIFY", + "uri": "/clear", + "custOpName": "http_post", + "operations": [ + "POST" + ], + "description": "Endpoint for removing a previous TSN connection configuration" + } + ], + "custOperations": [ + { + "commType": "REQUEST_RESPONSE", + "custOpName": "string", + "operations": [ + "POST" + ], + "description": "string" + } + ] + } + ], + "protocol": "HTTP_1_1", + "dataFormat": "JSON", + "securityMethods": [ + "Oauth" + ], + "interfaceDescriptions": [ + { + "ipv4Addr": "127.0.0.1", + "port": 8899, + "securityMethods": [ + "Oauth" + ] + } + ] + } + ], + "description": "API of dummy netapp to test", + "supportedFeatures": "fffff", + "shareableInfo": { + "isShareable": true, + "capifProvDoms": [ + "string" + ] + }, + "serviceAPICategory": "string", + "apiSuppFeats": "fffff", + "pubApiPath": { + "ccfIds": [ + "string" + ] + }, + "ccfId": "string" +} \ No newline at end of file diff --git a/Functionalities/deregister_and_login.py b/Functionalities/deregister_and_login.py new file mode 100644 index 0000000000000000000000000000000000000000..317c9bbe02403e4aba9f6eb28a1f19d1e4f2bea3 --- /dev/null +++ b/Functionalities/deregister_and_login.py @@ -0,0 +1,86 @@ +import json +import logging +import requests +import urllib3 +import emulator_utils +from requests.auth import HTTPBasicAuth +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + +logging.basicConfig( + level=logging.INFO, # Nivel mÃnimo de severidad a registrar + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # Formato del mensaje de log + handlers=[ + logging.FileHandler("register_logs.log"), # Registra en un archivo + logging.StreamHandler() # También muestra en la consola + ] +) + + +def main(): + + variables=__load_config_file(config_file=emulator_utils.get_register_file()) + log_result = __log_to_capif(variables) + admintoken = log_result["access_token"] + de_register_from_capif(admintoken,variables) + + logger.info("User eliminated succesfully") + + +def __log_to_capif(variables): + logger.info("Logging in to CAPIF") + capif_register_url="https://" + variables["register_host"].strip()+ ":" + variables["capif_register_port"] + "/" + try: + url = capif_register_url + "login" + + response = requests.request( + "POST", + url, + headers={"Content-Type": "application/json"}, + auth=HTTPBasicAuth(variables["capif_register_username"], variables["capif_register_password"]), + verify=False, + ) + response.raise_for_status() + response_payload = json.loads(response.text) + logger.info("Logged in to CAPIF successfully") + return response_payload + except Exception as e: + logger.error(f"Error during login to CAPIF: {e}") + raise + +def de_register_from_capif(admin_token,variables): + logger.info("Deleting CAPIF user") + capif_register_url="https://" + variables["register_host"].strip()+ ":" + variables["capif_register_port"] + "/" + + + url = capif_register_url + "deleteUser/" + variables["uuid"] + + headers = { + "Authorization": "Bearer {}".format(admin_token), + "Content-Type": "application/json", + } + response = requests.request( + "DELETE", + url, + headers=headers, + data=None, + verify=False + ) + response.raise_for_status() + logger.info("User deleted") + +def __load_config_file(config_file: str): + """Carga el archivo de configuración.""" + try: + with open(config_file, 'r') as file: + return json.load(file) + except FileNotFoundError: + logger.warning(f"Configuration file {config_file} not found. Using defaults or environment variables.") + return {} + + + +if __name__ == "__main__": + logger = logging.getLogger("CAPIF Register") + logger.info("Initializing CAPIF Register") + main() \ No newline at end of file diff --git a/Functionalities/emulator_utils.py b/Functionalities/emulator_utils.py new file mode 100755 index 0000000000000000000000000000000000000000..14408759f2abb58dee937b07fe0c6aa540373b45 --- /dev/null +++ b/Functionalities/emulator_utils.py @@ -0,0 +1,16 @@ + + +def get_config_file()-> str : + return "/Users/IDB0128/Documents/OpenCapif/SDK-S6G/Functionalities/Config_files/config.json" + +def get_register_file()-> str : + return "/Users/IDB0128/Documents/OpenCapif/SDK-S6G/Functionalities/Config_files/register.json" + +def provider_exposer_get_sample_api_description_path() -> str: + return "/Users/IDB0128/Documents/OpenCapif/SDK-S6G/Functionalities/capif_exposer_sample_files/provider_api_description_sample.json" + +def get_provider_config_file()->str: + return "/Users/IDB0128/Documents/OpenCapif/SDK-S6G/Functionalities/Config_files/Provider_config.json" + +def get_sdk_folder()-> str: + return "/Users/IDB0128/Documents/OpenCapif/SDK-S6G/" \ No newline at end of file diff --git a/Functionalities/invoker_capif_connector.py b/Functionalities/invoker_capif_connector.py new file mode 100755 index 0000000000000000000000000000000000000000..cfd1bccaccb0c8e299b4c4a9149a5d6034a83462 --- /dev/null +++ b/Functionalities/invoker_capif_connector.py @@ -0,0 +1,27 @@ + +import emulator_utils +import sys +sys.path.insert(0, emulator_utils.get_sdk_folder) + +# Ahora importa las clases desde tu archivo sdk.py +from sdk import CAPIFInvokerConnector + + + +def showcase_capif_connector(): + """ + This method showcases how one can use the CAPIFConnector class. + + """ + + capif_connector = CAPIFInvokerConnector(config_file=emulator_utils.get_config_file()) + + capif_connector.register_and_onboard_Invoker() + print("COMPLETED") + +if __name__ == "__main__": + #Let's register invoker to CAPIF. This should happen exactly once + showcase_capif_connector() + + + diff --git a/Functionalities/invoker_capif_connector_offboarding.py b/Functionalities/invoker_capif_connector_offboarding.py new file mode 100755 index 0000000000000000000000000000000000000000..b30dc77a9381332b752f803adca542228b1fe29f --- /dev/null +++ b/Functionalities/invoker_capif_connector_offboarding.py @@ -0,0 +1,26 @@ +import sys +import os + +# Agrega la ruta del archivo al sys.path +sys.path.append('/Users/IDB0128/Documents/OpenCapif/SDK-S6G') + + +import emulator_utils +import sys +sys.path.insert(0, emulator_utils.get_sdk_folder) + +# Ahora importa las clases desde tu archivo sdk.py +from sdk import CAPIFInvokerConnector + +import emulator_utils + +def showcase_offboard_and_deregister_invoker(): + capif_connector = CAPIFInvokerConnector(config_file=emulator_utils.get_config_file()) + capif_connector.offboard_and_deregister_Invoker() + print("COMPLETED") + + +if __name__ == "__main__": + showcase_offboard_and_deregister_invoker() + + diff --git a/Functionalities/invoker_service_discovery.py b/Functionalities/invoker_service_discovery.py new file mode 100755 index 0000000000000000000000000000000000000000..0b0d06c742ba6caef6a1ff453dff83eac5d20dfc --- /dev/null +++ b/Functionalities/invoker_service_discovery.py @@ -0,0 +1,20 @@ +import emulator_utils +import sys +sys.path.insert(0, emulator_utils.get_sdk_folder) + +# Ahora importa las clases desde tu archivo sdk.py +from sdk import ServiceDiscoverer + + + +def showcase_access_token_retrieval_from_capif(): + service_discoverer = ServiceDiscoverer(config_file=emulator_utils.get_config_file()) + service_discoverer.discover() + + +if __name__ == "__main__": + #The following code assumes that you have already registered the net app to capif. + #showcase_service_discovery() + #showcase_retrieve_endpoint_url_from_tsn() + showcase_access_token_retrieval_from_capif() + print("COMPLETED") diff --git a/Functionalities/invoker_service_get_token.py b/Functionalities/invoker_service_get_token.py new file mode 100755 index 0000000000000000000000000000000000000000..4fe52a96ba20361f42d8cbc98ab199bba499e4c2 --- /dev/null +++ b/Functionalities/invoker_service_get_token.py @@ -0,0 +1,21 @@ +import emulator_utils +import sys +sys.path.insert(0, emulator_utils.get_sdk_folder) + +# Ahora importa las clases desde tu archivo sdk.py +from sdk import ServiceDiscoverer + + + + +def showcase_access_token_retrieval_from_capif(): + service_discoverer = ServiceDiscoverer(config_file=emulator_utils.get_config_file()) + service_discoverer.get_tokens() + + +if __name__ == "__main__": + #The following code assumes that you have already registered the net app to capif. + #showcase_service_discovery() + #showcase_retrieve_endpoint_url_from_tsn() + showcase_access_token_retrieval_from_capif() + print("COMPLETED") diff --git a/Functionalities/provider_capif_connector.py b/Functionalities/provider_capif_connector.py new file mode 100755 index 0000000000000000000000000000000000000000..8fc6bfa9fc8a096f681bb866bb1d9e816eb0c3c5 --- /dev/null +++ b/Functionalities/provider_capif_connector.py @@ -0,0 +1,20 @@ +import emulator_utils +import sys +sys.path.insert(0, emulator_utils.get_sdk_folder) + +# Ahora importa las clases desde tu archivo sdk.py +from sdk import CAPIFProviderConnector +def showcase_capif_nef_connector(): + """ + + """ + capif_connector = CAPIFProviderConnector(config_file=emulator_utils.get_config_file()) + + capif_connector.register_and_onboard_provider() + + + print("COMPLETED") + +if __name__ == "__main__": + #Let's register a NEF to CAPIF. This should happen exactly once + showcase_capif_nef_connector() diff --git a/Functionalities/provider_capif_connector_offboarding.py b/Functionalities/provider_capif_connector_offboarding.py new file mode 100755 index 0000000000000000000000000000000000000000..8df73594fa68559e57c81c7a2622266384217957 --- /dev/null +++ b/Functionalities/provider_capif_connector_offboarding.py @@ -0,0 +1,20 @@ +import emulator_utils +import sys +sys.path.insert(0, emulator_utils.get_sdk_folder) + +# Ahora importa las clases desde tu archivo sdk.py +from sdk import CAPIFProviderConnector +def offboard_capif_nef_connector(): + """ + + """ + capif_connector = CAPIFProviderConnector(config_file=emulator_utils.get_config_file()) + + capif_connector.offboard_and_deregister_nef() + print("COMPLETED") + + + +if __name__ == "__main__": + #Let's register a NEF to CAPIF. This should happen exactly once + offboard_capif_nef_connector() diff --git a/Functionalities/provider_get_all_published_api.py b/Functionalities/provider_get_all_published_api.py new file mode 100644 index 0000000000000000000000000000000000000000..468d87ab58a0db3c7355c13049fcbea4f326251e --- /dev/null +++ b/Functionalities/provider_get_all_published_api.py @@ -0,0 +1,18 @@ +import emulator_utils +import sys +sys.path.insert(0, emulator_utils.get_sdk_folder) + +# Ahora importa las clases desde tu archivo sdk.py +from sdk import CAPIFProviderConnector +def showcase_capif_nef_connector(): + """ + + """ + capif_connector = CAPIFProviderConnector(config_file=emulator_utils.get_config_file()) + + capif_connector.get_all_services() + print("COMPLETED") + +if __name__ == "__main__": + #Let's register a NEF to CAPIF. This should happen exactly once + showcase_capif_nef_connector() diff --git a/Functionalities/provider_get_published_api.py b/Functionalities/provider_get_published_api.py new file mode 100644 index 0000000000000000000000000000000000000000..13eb933b3d6c9da821ab88dc491d2a6a566d75c0 --- /dev/null +++ b/Functionalities/provider_get_published_api.py @@ -0,0 +1,18 @@ +import emulator_utils +import sys +sys.path.insert(0, emulator_utils.get_sdk_folder) + +# Ahora importa las clases desde tu archivo sdk.py +from sdk import CAPIFProviderConnector +def showcase_capif_nef_connector(): + """ + + """ + capif_connector = CAPIFProviderConnector(config_file=emulator_utils.get_config_file()) + + capif_connector.get_service() + print("COMPLETED") + +if __name__ == "__main__": + #Let's register a NEF to CAPIF. This should happen exactly once + showcase_capif_nef_connector() diff --git a/Functionalities/provider_publish_api.py b/Functionalities/provider_publish_api.py new file mode 100644 index 0000000000000000000000000000000000000000..0a3ced1331898ea9a629d52600fe4c501bd3ab25 --- /dev/null +++ b/Functionalities/provider_publish_api.py @@ -0,0 +1,19 @@ +import emulator_utils +import sys +sys.path.insert(0, emulator_utils.get_sdk_folder) + +# Ahora importa las clases desde tu archivo sdk.py +from sdk import CAPIFProviderConnector +def showcase_capif_nef_connector(): + """ + + """ + capif_connector = CAPIFProviderConnector(config_file=emulator_utils.get_config_file()) + + capif_connector.publish_services( + service_api_description_json_full_path=emulator_utils.provider_exposer_get_sample_api_description_path()) + print("COMPLETED") + +if __name__ == "__main__": + #Let's register a NEF to CAPIF. This should happen exactly once + showcase_capif_nef_connector() diff --git a/Functionalities/provider_unpublish_api.py b/Functionalities/provider_unpublish_api.py new file mode 100644 index 0000000000000000000000000000000000000000..1cc8157e9f68f144fc966c13647ac6b0167821b0 --- /dev/null +++ b/Functionalities/provider_unpublish_api.py @@ -0,0 +1,18 @@ +import emulator_utils +import sys +sys.path.insert(0, emulator_utils.get_sdk_folder) + +# Ahora importa las clases desde tu archivo sdk.py +from sdk import CAPIFProviderConnector +def showcase_capif_nef_connector(): + """ + + """ + capif_connector = CAPIFProviderConnector(config_file=emulator_utils.get_config_file()) + + capif_connector.unpublish_service() + print("COMPLETED") + +if __name__ == "__main__": + #Let's register a NEF to CAPIF. This should happen exactly once + showcase_capif_nef_connector() diff --git a/Functionalities/provider_update_api.py b/Functionalities/provider_update_api.py new file mode 100644 index 0000000000000000000000000000000000000000..e9cfb276668d5a73422108affb00cf63802a3025 --- /dev/null +++ b/Functionalities/provider_update_api.py @@ -0,0 +1,19 @@ +import emulator_utils +import sys +sys.path.insert(0, emulator_utils.get_sdk_folder) + +# Ahora importa las clases desde tu archivo sdk.py +from sdk import CAPIFProviderConnector +def showcase_capif_nef_connector(): + """ + + """ + capif_connector = CAPIFProviderConnector(config_file=emulator_utils.get_config_file()) + + capif_connector.update_service( + service_api_description_json_full_path=emulator_utils.provider_exposer_get_sample_api_description_path()) + print("COMPLETED") + +if __name__ == "__main__": + #Let's register a NEF to CAPIF. This should happen exactly once + showcase_capif_nef_connector() diff --git a/Functionalities/register_and_login.py b/Functionalities/register_and_login.py new file mode 100644 index 0000000000000000000000000000000000000000..30e66fcfddd6bdbba42b91bdf2ecb22cc8cc7ce7 --- /dev/null +++ b/Functionalities/register_and_login.py @@ -0,0 +1,110 @@ +import json +import logging +import requests +import urllib3 +import emulator_utils +from requests.auth import HTTPBasicAuth +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + +logging.basicConfig( + level=logging.INFO, # Nivel mÃnimo de severidad a registrar + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # Formato del mensaje de log + handlers=[ + logging.FileHandler("register_logs.log"), # Registra en un archivo + logging.StreamHandler() # También muestra en la consola + ] +) + + +def main(): + + variables=__load_config_file(config_file=emulator_utils.get_register_file()) + log_result = __log_to_capif(variables) + admintoken = log_result["access_token"] + postcreation = __create_user(admintoken,variables) + uuid = postcreation["uuid"] + __write_to_file(uuid,variables) + logger.info(uuid) + +def __log_to_capif(variables): + logger.info("Logging in to CAPIF") + capif_register_url="https://" + variables["register_host"].strip()+ ":" + variables["capif_register_port"] + "/" + try: + url = capif_register_url + "login" + + response = requests.request( + "POST", + url, + headers={"Content-Type": "application/json"}, + auth=HTTPBasicAuth(variables["capif_register_username"], variables["capif_register_password"]), + verify=False, + ) + response.raise_for_status() + response_payload = json.loads(response.text) + logger.info("Logged in to CAPIF successfully") + return response_payload + except Exception as e: + logger.error(f"Error during login to CAPIF: {e}") + raise + + +def __create_user(admin_token,variables): + logger.info("Creating user in CAPIF") + capif_register_url="https://" + variables["register_host"].strip()+ ":" + variables["capif_register_port"] + "/" + try: + url = capif_register_url + "createUser" + payload = { + "username": variables["capif_username"], + "password": variables["capif_password"], + "description": "description", + "email": "csr_email_address@tid.es", + "enterprise": "csr_organization", + "country": "crs_locality", + "purpose": "SDK for SAFE 6G", + } + headers = { + "Authorization": "Bearer {}".format(admin_token), + "Content-Type": "application/json", + } + + response = requests.request( + "POST", url, headers=headers, data=json.dumps(payload), verify=False + ) + response.raise_for_status() + response_payload = json.loads(response.text) + logger.info("User created successfully") + return response_payload + except Exception as e: + logger.error(f"Error during user creation in CAPIF: {e}") + raise + +def __load_config_file(config_file: str): + """Carga el archivo de configuración.""" + try: + with open(config_file, 'r') as file: + return json.load(file) + except FileNotFoundError: + logger.warning(f"Configuration file {config_file} not found. Using defaults or environment variables.") + return {} + +def __write_to_file(uuid, variables): + logger.info("Saving uuid in config.json") + + # Abrimos el archivo y leemos su contenido + with open(variables["config_path"] + "config.json", "r") as infile: + data = json.load(infile) + + # Modificamos el contenido del archivo para incluir el nuevo UUID + data["uuid"] = uuid + + # Escribimos el contenido actualizado de nuevo en el archivo + with open(variables["config_path"] + "config.json", "w") as outfile: + json.dump(data, outfile, indent=4) + + logger.info("Data saved") + +if __name__ == "__main__": + logger = logging.getLogger("CAPIF Register") + logger.info("Initializing CAPIF Register") + main() \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000000000000000000000000000000000000..04c2533ea0aaeebf182e7172e1de8784d8b73b88 --- /dev/null +++ b/README.md @@ -0,0 +1,237 @@ + + +# SDK-S6G + +This tool is focused on connect to CAPIF in a simpler way. + + + +# Functionalities + +- **Invoker Capif connector**: Simplifies the process of onboarding for Invoker users + +- **Provider Capif connector**: Simplifies the process of onboarding for Provider users,also has the capability to register several APF's and AEF's if its necesary. + +- **Invoker Service Discovery**: Facilitates making a Discovery request to CAPIF, also stores the API services recieved and has an option to filter them. + +- **Invoker Service Get token**: After the Discovery, this functionality simplifies the way of getting created their propperly security context for each of the services and adquiring the access token to use the final API's + +- **Provider Publish Api**: Simplifies the process of publishing an API. Also has the capability to chose which APF and AEF's will be used to publish the API + +- **Provider Unpublish Api**: Simplifies the process of deleting an API. + +- **Provider Update Api**: Simplifies the process of updating an API. Also has the capability to chose which APF and AEF's will be used to update the API + +- **Provider Get Api**: Simplifies the process of recieving the information of One service published previously + +- **Provider Get all Apis**: Simplifies the process of recieving the information of all available services published previously. + +- **Invoker Capif connector offboarding**: Simplifies the process of offboarding for Invoker users + +- **Provider Capif connector offboarding**: Simplifies the process of offboarding for Provider users + + + + +## Other Functionalities + +Apart from the SDK it is available diferent functionalities for development reasons + +- **Register and login**: Facilitates the loggin process for admin users and creates a CAPIF user +- **Deregister and login**: Facilitates the loggin process for admin users and eliminates a CAPIF user + + + + + +# Installation + +To use SDK-S6G we must follow this path for his Installation. + +1 - Create an enviroment with pyenv + + #Comands to install the enviroment + pyenv install 3.12 + pyenv virtualenv 3.12 Sdkenviroment + + #OPTIONAL + #Sometimes Mac shells has a little trouble while finding the shell path, try this command + export PATH="$HOME/.pyenv/bin:$PATH" + eval "$(pyenv init --path)" + eval "$(pyenv init -)" + eval "$(pyenv virtualenv-init -)" +2 - Clone the repository + + git clone -b v1 --single-branch https://github.com/JorgeEcheva26/SDK-S6G.git + + + #Then move to the SDK-S6G folder + + cd /your/path/to/SDK-S6G + +3 - Install the requirements.txt file + + cd Safe-6g.egg-info + + python -m pip install --upgrade pip + + pip install -r requirements.txt + +Congratulations! You ended the installation for SDK-S6G + + + +# How to use SDK-S6G + +1 - First we need to complete the emulator utils file with our absolute paths in order to complete the configuration of the SDK.The register file is not needed for the use of the SDK.The provider_exposer_get_sample_api_description_path is obligatory if we want to use the publish functionalities. + +2 - Then we need to fill out Config files depending on the functionalities we want to use from the SDK + +## Config.json + + "invoker_folder": String | The path (relative or absolute) of the folder you want to store your invoker information + + "provider_folder": String | The path (relative or absolute) of the folder you want to store your invoker information + + "capif_host": String | The domain name of your capif host + + "register_host": String | The domain name of your register host + + "capif_https_port": Integer | The port of your capif host + + "capif_register_port": Integer | The port of your register host + + "capif_callback_url": String | The Url you want to recieve CAPIF notifications(This functionality is not currently available) + + "csr_common_name": String | Information for your invoker certificate + + "csr_organizational_unit": String | Information for your invoker certificate + + "csr_organization": String | Information for your invoker certificate + + "crs_locality": String | Information for your invoker certificate + + "csr_state_or_province_name": String |Information for your invoker certificate + + "csr_country_name": String | Information for your invoker certificate + + "csr_email_address": String | Information for your invoker certificate + + "capif_username": String | CAPIF username + + "capif_password": String | CAPIF password + + "APFs": Integer | Number of APF's you want to onboard as a provider Example:5 + + "AEFs": Integer | Number of AEF's you want to onboard as a provider Example:2 + + "debug_mode": Boolean | If you want to recieve logs from SDK-S6G Example:True/False + +Required fields no matter you onboard as an invoker or provider: + +- Capif_host +- register_host +- capif_https_port +- capif_register_port +- capif_username +- capif_password +- debug_mode + +If you want to use SDK as an Invoker you need to fill out these fields + +- invoker_folder +- capif_callback_url +- csr_information(csr_common_name,csr_country_name...) + +If you want to use SDK as a Provider you need to fill out these fields + +- provider_folder +- APFs +- AEFs + + + +## Publish.json + + "serviceApiId": String | The Api id we want to use Example "02eff6e1b3a8f7c8044a92ee8a30bd" + "publisherAPFid": String | APF id we chose to use Example : "APFa165364a379035d14311deadc04332" + "publisherAEFsids": Array | Array of strings filled out of AEFs ids we want to use Example: ["AEFfa38f0e855bffb420e4994ecbc8fb9","AEFe8bfa711f4f0c95ba0b382508e6382"] + +ServiceApiId is required in: +- Provider Unpublish Api +- Provider Update Api +- Provider Get api + +PublisherAPFid is required in: + +- Provider Publish Api +- Provider Unpublish Api +- Provider Update Api +- Provider Get Api +- Provider Get all Apis + +PublisherAEFsids is required in: + +- Provider Publish Api +- Provider Unpublish Api +- Provider Update Api + +For using the Publish Api function or the Update function you **must** modify the provider_api_description_sample.json with the Publish API that you want to share following the standard schema for [ServiceAPIDescription](https://github.com/jdegre/5GC_APIs/blob/Rel-18/TS29222_CAPIF_Publish_Service_API.yaml) + +You won't need to fill out the aefIds fields from aefProfiles array because you would already be configurating this fields by completing publisherAEFsids parameter + +If the publisherAEFsids parameter don't match with the aefProfiles you will recieve an error + +### Important information for Provider users + +In the provider_folder, you will find several folders with each capif_username you have onboarded as a provider, for each folder you could find: + +- Capif_provider_details.json : Contains all the APFs and AEFs ids you have already onboarded with this capif_username +- CAPIF_provider_api_description_sample.json : If you already published or updated an API, you will find a copy of your last payload. +- Service_received.json : If you already used the get an api or get all apis functionality, you will find the response to your request. +- Published-Apis.json : Constains the currently published APIs with their ApiId + +## Discover_filter.json +This file follows the parameters schema from the GET petition of [Discover Services API](https://github.com/jdegre/5GC_APIs/blob/Rel-18/TS29222_CAPIF_Discover_Service_API.yaml) + +To use this feature you must complete the file with the parameters you want to be filtered and then run the Invoker Service Discovery Functionality. + +To run the Invoker Service Discovery Functionality you must have onboarded as an Invoker before. + +### Important information for Invoker users + +In the `invoker_folder`, you will find several folders with each `capif_username` you have onboarded as a provider. For each folder, you could find: + +- `Capif_api_security_context_details.json`: This file contains the information of your invoker. It will contain: + + 1. Your `api_invoker_id`. + 2. If you have already used the Service Discovery Functionality, you will find all the available APIs with their information. + 3. If you have already used the Service Get Token functionality, you will find your access token for using the APIs you have already discovered. + +By default, the Service Get Token will get the access token for using all the APIs that are available. So if you want to filter the APIs and reach only the API you want, you must: + +1. Complete your `Discover_filter.json` file. +2. Use the Service Discovery Functionality. +3. Use the Service Get Token Functionality. + +## Register.json + + + "register_host": String | The domain name of your register host + "capif_register_port": Integer | The port of your register host + "capif_register_username": String | CAPIF admin username + "capif_register_password": String | CAPIF admin password + "capif_username": String | CAPIF user username + "capif_password": String | CAPIF user password + "config_path": String | Absolute path to the Config_files folder + "uuid": String | UUID for Deregistering the user + + +This file is only used for the Functionalities of : +- Register and login +- Deregister and login + +Each field is obligatory to complete except UUID, which is only obligatory in case of Deregistering the user. + +Although this field is not obligatory we recomend to store the UUID parameter recieved by the Register and login functionality in this field. + diff --git a/Safe-6g/sdk.py b/Safe-6g/sdk.py new file mode 100644 index 0000000000000000000000000000000000000000..7f6983534590e3f8d6232bad4337b6ac7e33c444 --- /dev/null +++ b/Safe-6g/sdk.py @@ -0,0 +1,1717 @@ +import os +import logging +import shutil +import subprocess +from requests.auth import HTTPBasicAuth +import urllib3 + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +# Ahora realiza tu solicitud HTTPS a 'localhost' + +from OpenSSL.SSL import FILETYPE_PEM +from OpenSSL.crypto import ( + dump_certificate_request, + dump_privatekey, + load_publickey, + PKey, + TYPE_RSA, + X509Req, + dump_publickey, +) +import requests +import json +from uuid import uuid4 +import warnings +from requests.exceptions import RequestsDependencyWarning +warnings.filterwarnings("ignore", category=RequestsDependencyWarning) + +# Configuración básica del logger +logging.basicConfig( + level=logging.NOTSET, # Nivel mÃnimo de severidad a registrar + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # Formato del mensaje de log + handlers=[ + logging.FileHandler("sdk_logs.log"), # Registra en un archivo + logging.StreamHandler() # También muestra en la consola + ] +) + +class CAPIFInvokerConnector: + """ + Τhis class is responsbile for onboarding an Invoker (ex. a Invoker) to CAPIF + """ + def __init__(self, + config_file: str ): + + config_file = os.path.abspath(config_file) + # Cargar configuración desde archivo si es necesario + config = self.__load_config_file(config_file) + + debug_mode = os.getenv('DEBUG_MODE', config.get('debug_mode', 'False')).strip().lower() + if debug_mode=="false": debug_mode=False + + # Inicializar logger + self.logger = logging.getLogger(self.__class__.__name__) + if debug_mode: + self.logger.setLevel(logging.DEBUG) + else: + self.logger.setLevel(logging.WARNING) + + + + + urllib_logger = logging.getLogger("urllib3") + if not debug_mode: + urllib_logger.setLevel(logging.WARNING) + else: + urllib_logger.setLevel(logging.DEBUG) + + self.logger.info("Initializing CAPIFInvokerConnector") + + # Asignar valores desde variables de entorno o desde el archivo de configuración + + invoker_general_folder = os.path.abspath(os.getenv('invoker_folder', config.get('invoker_folder', '')).strip()) + + capif_host = os.getenv('CAPIF_HOST', config.get('capif_host', '')).strip() + register_host = os.getenv('REGISTER_HOST', config.get('register_host', '')).strip() + capif_https_port = str(os.getenv('CAPIF_HTTPS_PORT', config.get('capif_https_port', '')).strip()) + capif_register_port = str(os.getenv('CAPIF_REGISTER_PORT', config.get('capif_register_port', '')).strip()) + capif_invoker_username = os.getenv('CAPIF_USERNAME', config.get('capif_username', '')).strip() + capif_invoker_password = os.getenv('CAPIF_PASSWORD', config.get('capif_password', '')).strip() + capif_callback_url = os.getenv('CAPIF_CALLBACK_URL', config.get('capif_callback_url', '')).strip() + + csr_common_name = os.getenv('CSR_COMMON_NAME', config.get('csr_common_name', '')).strip() + csr_organizational_unit = os.getenv('CSR_ORGANIZATIONAL_UNIT', config.get('csr_organizational_unit', '')).strip() + csr_organization = os.getenv('CSR_ORGANIZATION', config.get('csr_organization', '')).strip() + crs_locality = os.getenv('CRS_LOCALITY', config.get('crs_locality', '')).strip() + csr_state_or_province_name = os.getenv('CSR_STATE_OR_PROVINCE_NAME', config.get('csr_state_or_province_name', '')).strip() + csr_country_name = os.getenv('CSR_COUNTRY_NAME', config.get('csr_country_name', '')).strip() + csr_email_address = os.getenv('CSR_EMAIL_ADDRESS', config.get('csr_email_address', '')).strip() + + self.invoker_folder=os.path.join(invoker_general_folder,capif_invoker_username) + os.makedirs(self.invoker_folder, exist_ok=True) + # Resto del código original para inicializar URLs y otros atributos + + + if len(capif_https_port) == 0 or int(capif_https_port) == 443: + self.capif_https_url = "https://" + capif_host.strip() + "/" + else: + self.capif_https_url = ( + "https://" + capif_host.strip() + ":" + capif_https_port.strip() + "/" + ) + + if len(capif_register_port) == 0: + self.capif_register_url = "https://" + register_host.strip() + ":8084/" + else: + self.capif_register_url = ( + "https://" + register_host.strip() + ":" + capif_register_port.strip() + "/" + ) + + self.capif_callback_url = self.__add_trailing_slash_to_url_if_missing( + capif_callback_url.strip() + ) + + self.capif_invoker_username = capif_invoker_username + self.capif_invoker_password = capif_invoker_password + + self.csr_common_name = "invoker_" + csr_common_name + self.csr_organizational_unit = csr_organizational_unit + self.csr_organization = csr_organization + self.crs_locality = crs_locality + self.csr_state_or_province_name = csr_state_or_province_name + self.csr_country_name = csr_country_name + self.csr_email_address = csr_email_address + self.capif_api_details_filename = "capif_api_security_context_details-"+self.capif_invoker_username+".json" + #self.capif_api_details = self.__load_invoker_api_details() + + self.logger.info("CAPIFInvokerConnector initialized with the config.json parameters") + + def __load_config_file(self, config_file: str): + """Carga el archivo de configuración.""" + try: + with open(config_file, 'r') as file: + return json.load(file) + except FileNotFoundError: + self.logger.warning(f"Configuration file {config_file} not found. Using defaults or environment variables.") + return {} + + def __add_trailing_slash_to_url_if_missing(self, url): + if url[len(url) - 1] != "/": + url = url + "/" + return url + + def register_and_onboard_Invoker(self) -> None: + self.logger.info("Registering and onboarding Invoker") + try: + public_key = self.__create_private_and_public_keys() + capif_postauth_info = self.__save_capif_ca_root_file_and_get_auth_token() + capif_onboarding_url = capif_postauth_info["ccf_onboarding_url"] + capif_discover_url = capif_postauth_info["ccf_discover_url"] + capif_access_token = capif_postauth_info["access_token"] + api_invoker_id = self.__onboard_invoker_to_capif_and_create_the_signed_certificate( + public_key, capif_onboarding_url, capif_access_token + ) + self.__write_to_file( api_invoker_id, capif_discover_url) + self.logger.info("Invoker registered and onboarded successfully") + except Exception as e: + self.logger.error(f"Error during Invoker registration and onboarding: {e}") + raise + + def __load_invoker_api_details(self): + self.logger.info("Loading Invoker API details") + path = os.path.join( + self.invoker_folder, + self.capif_api_details_filename + ) + with open( + path, "r" + ) as openfile: + return json.load(openfile) + + def __offboard_Invoker(self) -> None: + self.logger.info("Offboarding Invoker") + try: + capif_api_details = self.__load_invoker_api_details() + url = ( + self.capif_https_url + + "api-invoker-management/v1/onboardedInvokers/" + + capif_api_details["api_invoker_id"] + ) + + signed_key_crt_path = os.path.join( + self.invoker_folder, + capif_api_details["user_name"] + ".crt" + ) + + private_key_path = os.path.join( + self.invoker_folder, + "private.key" + ) + + path = os.path.join( + self.invoker_folder, + "ca.crt" + ) + response = requests.request( + "DELETE", + url, + cert=(signed_key_crt_path, private_key_path), + verify=path, + ) + response.raise_for_status() + self.logger.info("Invoker offboarded successfully") + except Exception as e: + self.logger.error(f"Error during Invoker offboarding: {e}") + raise + + def offboard_and_deregister_Invoker(self) -> None: + self.logger.info("Offboarding and deregistering Invoker") + try: + self.__offboard_Invoker() + self.__remove_files() + self.logger.info("Invoker offboarded and deregistered successfully") + except Exception as e: + self.logger.error(f"Error during Invoker offboarding and deregistering: {e}") + raise + + def __create_private_and_public_keys(self) -> str: + self.logger.info("Creating private and public keys for the Invoker cert") + try: + private_key_path = os.path.join(self.invoker_folder, "private.key") + + csr_file_path = os.path.join(self.invoker_folder, "cert_req.csr") + + key = PKey() + key.generate_key(TYPE_RSA, 2048) + + req = X509Req() + req.get_subject().CN = self.csr_common_name + req.get_subject().O = self.csr_organization + req.get_subject().OU = self.csr_organizational_unit + req.get_subject().L = self.crs_locality + req.get_subject().ST = self.csr_state_or_province_name + req.get_subject().C = self.csr_country_name + req.get_subject().emailAddress = self.csr_email_address + req.set_pubkey(key) + req.sign(key, "sha256") + + with open(csr_file_path, "wb+") as f: + f.write(dump_certificate_request(FILETYPE_PEM, req)) + public_key = dump_certificate_request(FILETYPE_PEM, req) + with open(private_key_path, "wb+") as f: + f.write(dump_privatekey(FILETYPE_PEM, key)) + + self.logger.info("Keys created successfully") + return public_key + except Exception as e: + self.logger.error(f"Error during key creation: {e}") + raise + + + + def __remove_files(self): + self.logger.info("Removing files generated") + try: + folder_path = self.invoker_folder + + if os.path.exists(folder_path): + # Elimina todo el contenido dentro de la carpeta, incluyendo archivos y subcarpetas + for root, dirs, files in os.walk(folder_path): + for file in files: + os.remove(os.path.join(root, file)) + for dir in dirs: + shutil.rmtree(os.path.join(root, dir)) + os.rmdir(folder_path) + self.logger.info(f"All contents in {folder_path} removed successfully.") + else: + self.logger.warning(f"Folder {folder_path} does not exist.") + except Exception as e: + self.logger.error(f"Error during removing folder contents: {e}") + raise + + + + + def __save_capif_ca_root_file_and_get_auth_token(self): + self.logger.info("Saving CAPIF CA root file and getting auth token with user and password given by the CAPIF administrator") + try: + url = self.capif_register_url + "getauth" + + response = requests.request( + "GET", + url, + headers={"Content-Type": "application/json"}, + auth=HTTPBasicAuth(self.capif_invoker_username, self.capif_invoker_password), + verify=False, + ) + + response.raise_for_status() + response_payload = json.loads(response.text) + ca_root_file_path = os.path.join(self.invoker_folder, "ca.crt") + ca_root_file = open(ca_root_file_path, "wb+") + ca_root_file.write(bytes(response_payload["ca_root"], "utf-8")) + self.logger.info("CAPIF CA root file saved and auth token obtained successfully") + return response_payload + except Exception as e: + self.logger.error(f"Error during saving CAPIF CA root file and getting auth token: {e}") + raise + + def __onboard_invoker_to_capif_and_create_the_signed_certificate( + self, public_key, capif_onboarding_url, capif_access_token + ): + self.logger.info("Onboarding Invoker to CAPIF and creating signed certificate by giving our public key to CAPIF") + try: + url = self.capif_https_url + capif_onboarding_url + payload_dict = { + "notificationDestination": self.capif_callback_url, + "supportedFeatures": "fffffff", + "apiInvokerInformation": self.csr_common_name, + "websockNotifConfig": { + "requestWebsocketUri": True, + "websocketUri": "websocketUri", + }, + "onboardingInformation": {"apiInvokerPublicKey": str(public_key, "utf-8")}, + "requestTestNotification": True, + } + payload = json.dumps(payload_dict) + headers = { + "Authorization": "Bearer {}".format(capif_access_token), + "Content-Type": "application/json", + } + pathca = os.path.join(self.invoker_folder,"ca.crt") + response = requests.request( + "POST", + url, + headers=headers, + data=payload, + verify=pathca, + ) + response.raise_for_status() + response_payload = json.loads(response.text) + name=self.capif_invoker_username+".crt" + pathcsr = os.path.join(self.invoker_folder, name) + certification_file = open( + pathcsr, "wb" + ) + certification_file.write( + bytes( + response_payload["onboardingInformation"]["apiInvokerCertificate"], + "utf-8", + ) + ) + certification_file.close() + self.logger.info("Invoker onboarded and signed certificate created successfully") + return response_payload["apiInvokerId"] + except Exception as e: + self.logger.error(f"Error during onboarding Invoker to CAPIF: {e}") + raise + + def __write_to_file(self, api_invoker_id, discover_services_url): + self.logger.info("Writing API invoker ID and service discovery URL to file") + path = os.path.join(self.invoker_folder, self.capif_api_details_filename) + try: + with open( + path, "w" + ) as outfile: + json.dump( + { + "user_name": self.capif_invoker_username, + "api_invoker_id": api_invoker_id, + "discover_services_url": discover_services_url, + }, + outfile, + ) + self.logger.info("API invoker ID and service discovery URL written to file successfully") + except Exception as e: + self.logger.error(f"Error during writing to file: {e}") + raise + +class CAPIFProviderConnector: + """ + Τhis class is responsible for onboarding an exposer (eg. NEF emulator) to CAPIF + """ + def __init__(self, config_file: str): + """ + Inicializa el conector CAPIFProvider con los parámetros especificados en el archivo de configuración. + """ + # Cargar configuración desde archivo si es necesario + config_file = os.path.abspath(config_file) + self.config_path = os.path.dirname(config_file)+"/" + config = self.__load_config_file(config_file) + debug_mode = os.getenv('DEBUG_MODE', config.get('debug_mode', 'False')).strip().lower() + if debug_mode=="false": debug_mode=False + # Inicializar logger + self.logger = logging.getLogger(self.__class__.__name__) + if debug_mode: + self.logger.setLevel(logging.DEBUG) + else: + self.logger.setLevel(logging.WARNING) + + + + + urllib_logger = logging.getLogger("urllib3") + if not debug_mode: + urllib_logger.setLevel(logging.WARNING) + else: + urllib_logger.setLevel(logging.DEBUG) + + + + + try: + + + provider_general_folder = os.path.abspath(os.getenv('PROVIDER_FOLDER', config.get('provider_folder', '')).strip()) + capif_host = os.getenv('CAPIF_HOST', config.get('capif_host', '')).strip() + capif_register_host = os.getenv('REGISTER_HOST', config.get('register_host', '')).strip() + capif_https_port = str(os.getenv('CAPIF_HTTPS_PORT', config.get('capif_https_port', '')).strip()) + capif_register_port = str(os.getenv('CAPIF_REGISTER_PORT', config.get('capif_register_port', '')).strip()) + capif_provider_username = os.getenv('CAPIF_USERNAME', config.get('capif_username', '')).strip() + capif_provider_password = os.getenv('CAPIF_PASSWORD', config.get('capif_password', '')).strip() + + csr_common_name = os.getenv('CSR_COMMON_NAME', config.get('csr_common_name', '')).strip() + csr_organizational_unit = os.getenv('CSR_ORGANIZATIONAL_UNIT', config.get('csr_organizational_unit', '')).strip() + csr_organization = os.getenv('CSR_ORGANIZATION', config.get('csr_organization', '')).strip() + crs_locality = os.getenv('CRS_LOCALITY', config.get('crs_locality', '')).strip() + csr_state_or_province_name = os.getenv('CSR_STATE_OR_PROVINCE_NAME', config.get('csr_state_or_province_name', '')).strip() + csr_country_name = os.getenv('CSR_COUNTRY_NAME', config.get('csr_country_name', '')).strip() + csr_email_address = os.getenv('CSR_EMAIL_ADDRESS', config.get('csr_email_address', '')).strip() + APFs = os.getenv('APFS', config.get('APFs', '')).strip() + AEFs = os.getenv('AEFS', config.get('AEFs', '')).strip() + + + if not capif_host: + self.logger.warning("CAPIF_HOST is not provided; defaulting to an empty string") + if not capif_provider_username: + self.logger.error("CAPIF_PROVIDER_USERNAME is required but not provided") + raise ValueError("CAPIF_PROVIDER_USERNAME is required") + + self.provider_folder = os.path.join(provider_general_folder, capif_provider_username) + os.makedirs(self.provider_folder, exist_ok=True) + + self.capif_host = capif_host.strip() + self.capif_provider_username = capif_provider_username + self.capif_provider_password = capif_provider_password + self.capif_register_host = capif_register_host + self.capif_register_port = capif_register_port + self.csr_common_name = csr_common_name + self.csr_organizational_unit = csr_organizational_unit + self.csr_organization = csr_organization + self.crs_locality = crs_locality + self.csr_state_or_province_name = csr_state_or_province_name + self.csr_country_name = csr_country_name + self.csr_email_address = csr_email_address + self.AEFs = int(AEFs) + self.APFs = int(APFs) + + + self.capif_https_port = str(capif_https_port) + + + if len(self.capif_https_port) == 0 or int(self.capif_https_port) == 443: + self.capif_https_url = f"https://{capif_host.strip()}/" + else: + self.capif_https_url = f"https://{capif_host.strip()}:{self.capif_https_port.strip()}/" + + if len(capif_register_port) == 0: + self.capif_register_url = f"https://{capif_register_host.strip()}:8084/" + else: + self.capif_register_url = f"https://{capif_register_host.strip()}:{capif_register_port.strip()}/" + + self.logger.info("CAPIFProviderConnector initialized with the config.json parameters") + + except Exception as e: + self.logger.error(f"Error during initialization: {e}") + raise + + + + def __store_certificate(self) -> None: + # Retrieves and stores the cert_server.pem from CAPIF. + self.logger.info("Retrieving capif_cert_server.pem, this may take a few minutes.") + + cmd = f"openssl s_client -connect {self.capif_host}:{self.capif_https_port} | openssl x509 -text > {self.provider_folder}/capif_cert_server.pem" + + try: + # Redirige la salida estándar y de errores a os.devnull para ocultar los logs + with open(os.devnull, 'w') as devnull: + subprocess.run(cmd, shell=True, check=True, stdout=devnull, stderr=devnull) + + cert_file = os.path.join(self.provider_folder, "capif_cert_server.pem") + if os.path.exists(cert_file) and os.path.getsize(cert_file) > 0: + self.logger.info("cert_server.pem successfully generated!") + else: + self.logger.error("Failed to generate cert_server.pem.") + raise FileNotFoundError(f"Certificate file not found at {cert_file}") + except subprocess.CalledProcessError as e: + self.logger.error(f"Command failed: {e}") + raise + except Exception as e: + self.logger.error(f"Error occurred: {e}") + raise + + + def __load_config_file(self, config_file: str): + """Carga el archivo de configuración.""" + try: + with open(config_file, 'r') as file: + return json.load(file) + except FileNotFoundError: + self.logger.warning(f"Configuration file {config_file} not found. Using defaults or environment variables.") + return {} + + def __create_private_and_public_keys(self, api_prov_func_role) -> bytes: + """ + Creates private and public keys in the certificates folder. + :return: The contents of the public key + """ + private_key_path = os.path.join(self.provider_folder, f"{api_prov_func_role}_private_key.key") + csr_file_path = os.path.join(self.provider_folder, f"{api_prov_func_role}_public.csr") + + # Create key pair + key = PKey() + key.generate_key(TYPE_RSA, 2048) + + # Create CSR + req = X509Req() + subject = req.get_subject() + subject.CN = api_prov_func_role.lower() + subject.O = self.csr_organization + subject.OU = self.csr_organizational_unit + subject.L = self.crs_locality + subject.ST = self.csr_state_or_province_name + subject.C = self.csr_country_name + subject.emailAddress = self.csr_email_address + + req.set_pubkey(key) + req.sign(key, "sha256") + + # Write CSR and private key to files + with open(csr_file_path, "wb") as csr_file: + public_key = dump_certificate_request(FILETYPE_PEM, req) + csr_file.write(public_key) + + with open(private_key_path, "wb") as private_key_file: + private_key_file.write(dump_privatekey(FILETYPE_PEM, key)) + + return public_key + + def __onboard_exposer_to_capif(self, access_token, capif_onboarding_url): + self.logger.info("Onboarding Provider to CAPIF and waiting signed certificate by giving our public keys to CAPIF") + + url = f"{self.capif_https_url}{capif_onboarding_url}" + headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", + } + + # Crear la lista de roles sin indexar + roles = ["AMF"] + for n in range(1, self.AEFs + 1): + roles.append("AEF") + + for n in range(1, self.APFs + 1): + roles.append("APF") + + # Construir el payload con los roles sin indexar + payload = { + "apiProvFuncs": [ + {"regInfo": {"apiProvPubKey": ""}, "apiProvFuncRole": role, "apiProvFuncInfo": f"{role.lower()}"} + for role in roles + ], + "apiProvDomInfo": "This is provider", + "suppFeat": "fff", + "failReason": "string", + "regSec": access_token, + } + + # Generar los roles indexados para la creación de certificados + indexedroles = ["AMF"] + for n in range(1, self.AEFs + 1): + indexedroles.append(f"AEF-{n}") + + for n in range(1, self.APFs + 1): + indexedroles.append(f"APF-{n}") + + # Guardar las claves públicas y generar los certificados con roles indexados + for i, api_func in enumerate(payload["apiProvFuncs"]): + # Generar las claves públicas con el rol indexado, pero no actualizar el payload con el rol indexado + public_key = self.__create_private_and_public_keys(indexedroles[i]) + + # Asignar la clave pública al payload + api_func["regInfo"]["apiProvPubKey"] = public_key.decode("utf-8") + + + try: + response = requests.post( + url, + headers=headers, + data=json.dumps(payload), + verify=os.path.join(self.provider_folder, "ca.crt"), + ) + response.raise_for_status() + self.logger.info("Provider onboarded and signed certificate obtained successfully") + return response.json() + except requests.exceptions.RequestException as e: + self.logger.error(f"Onboarding failed: {e}") + raise + + + def __write_to_file(self, onboarding_response, capif_registration_id, publish_url): + self.logger.info("Saving the most relevant onboarding data") + + # Generar los roles indexados para la correspondencia + indexedroles = ["AMF"] + for n in range(1, self.AEFs + 1): + indexedroles.append(f"AEF-{n}") + + for n in range(1, self.APFs + 1): + indexedroles.append(f"APF-{n}") + + # Guardar los certificados con los nombres indexados + for i, func_profile in enumerate(onboarding_response["apiProvFuncs"]): + role = indexedroles[i].lower() + cert_path = os.path.join(self.provider_folder, f"{role}.crt") + with open(cert_path, "wb") as cert_file: + cert_file.write(func_profile["regInfo"]["apiProvCert"].encode("utf-8")) + + # Guardar los detalles del proveedor + provider_details_path = os.path.join(self.provider_folder, "capif_provider_details.json") + with open(provider_details_path, "w") as outfile: + data = { + "capif_registration_id": capif_registration_id, + "publish_url": publish_url, + **{f"{indexedroles[i]}_api_prov_func_id": api_prov_func["apiProvFuncId"] + for i, api_prov_func in enumerate(onboarding_response["apiProvFuncs"])} + } + json.dump(data, outfile, indent=4) + + self.logger.info("Data saved") + + + + + def __save_capif_ca_root_file_and_get_auth_token(self): + url = f"{self.capif_register_url}getauth" + self.logger.info("Saving CAPIF CA root file and getting auth token with user and password given by the CAPIF administrator") + + try: + response = requests.get( + url, + headers={"Content-Type": "application/json"}, + auth=HTTPBasicAuth(self.capif_provider_username, self.capif_provider_password), + verify=False + ) + response.raise_for_status() + + self.logger.info("Authorization acquired successfully") + + response_payload = response.json() + ca_root_file_path = os.path.join(self.provider_folder, "ca.crt") + + with open(ca_root_file_path, "wb") as ca_root_file: + ca_root_file.write(response_payload["ca_root"].encode("utf-8")) + + self.logger.info("CAPIF CA root file saved and auth token obtained successfully") + return response_payload + + except requests.exceptions.RequestException as e: + self.logger.error(f"Error acquiring authorization: {e}") + raise + + + def register_and_onboard_provider(self) -> None: + """ + Retrieves and stores the certificate from CAPIF, acquires authorization, and registers the provider. + """ + # Store the certificate + self.__store_certificate() + + # Retrieve CA root file and get authorization token + capif_postauth_info = self.__save_capif_ca_root_file_and_get_auth_token() + + # Extract necessary information + capif_onboarding_url = capif_postauth_info["ccf_api_onboarding_url"] + access_token = capif_postauth_info["access_token"] + ccf_publish_url = capif_postauth_info["ccf_publish_url"] + + # Onboard provider to CAPIF + onboarding_response = self.__onboard_exposer_to_capif( + access_token, capif_onboarding_url + ) + + # Save onboarding details to file + capif_registration_id = onboarding_response["apiProvDomId"] + self.__write_to_file( + onboarding_response, capif_registration_id, ccf_publish_url + ) + + + + def publish_services(self, service_api_description_json_full_path: str) -> dict: + """ + Publishes services to CAPIF and returns the published services dictionary. + + :param service_api_description_json_full_path: The full path of the service_api_description.json containing + the endpoints to be published. + :return: The published services dictionary that was saved in CAPIF. + """ + self.logger.info("Starting the service publication process") + + # Load provider details + provider_details_path = os.path.join(self.provider_folder, "capif_provider_details.json") + self.logger.info(f"Loading provider details from {provider_details_path}") + + provider_details=self.__load_provider_api_details() + publish_url=provider_details["publish_url"] + + json_path = self.config_path + "publish.json" + + + # Leer el archivo publish.json + with open(json_path, 'r') as f: + chosenAPFsandAEFs = json.load(f) + + APF_api_prov_func_id = chosenAPFsandAEFs["publisherAPFid"] + AEFs_list = chosenAPFsandAEFs["publisherAEFsids"] + + apf_number = None + for key, value in provider_details.items(): + if value == APF_api_prov_func_id and key.startswith("APF-"): + apf_inter = key.split("-")[1] + apf_number= apf_inter.split("_")[0] # Obtener el número del APF + break + + if apf_number is None: + self.logger.error(f"No matching APF found for publisherAPFid: {APF_api_prov_func_id}") + raise ValueError("Invalid publisherAPFid") + + # Leer y modificar la descripción de la API de servicio + self.logger.info(f"Reading and modifying service API description from {service_api_description_json_full_path}") + + try: + with open(service_api_description_json_full_path, "r") as service_file: + data = json.load(service_file) + + # Verificamos que el número de AEFs coincide con el número de perfiles + if len(AEFs_list) != len(data.get("aefProfiles", [])): + self.logger.error("The number of AEFs in publisherAEFsids does not match the number of profiles in aefProfiles") + raise ValueError("Mismatch between number of AEFs and profiles") + + # Asignamos los AEFs correspondientes + for profile, aef_id in zip(data.get("aefProfiles", []), AEFs_list): + profile["aefId"] = aef_id + + self.logger.info("Service API description modified successfully") + + # Guardamos los cambios en el archivo + with open(service_api_description_json_full_path, "w") as service_file: + json.dump(data, service_file, indent=4) + + except FileNotFoundError: + self.logger.error(f"Service API description file not found: {service_api_description_json_full_path}") + raise + except json.JSONDecodeError as e: + self.logger.error(f"Error decoding JSON from file {service_api_description_json_full_path}: {e}") + raise + except ValueError as e: + self.logger.error(f"Error with the input data: {e}") + raise + + # Publish services + url = f"{self.capif_https_url}{publish_url.replace('<apfId>', APF_api_prov_func_id)}" + cert = ( + os.path.join(self.provider_folder, f"apf-{apf_number}.crt"), + os.path.join(self.provider_folder, f"apf-{apf_number}_private_key.key"), + ) + + self.logger.info(f"Publishing services to URL: {url}") + + try: + response = requests.post( + url, + headers={"Content-Type": "application/json"}, + data=json.dumps(data), + cert=cert, + verify=os.path.join(self.provider_folder, "ca.crt"), + ) + response.raise_for_status() + self.logger.info("Services published successfully") + + # Save response to file + capif_response_text = response.text + + capif_response_json=json.loads(capif_response_text) + + file_name = capif_response_json.get("apiName", "default_name") # Default name if apiName is missing + id=capif_response_json.get("apiId","default_id") + output_path = os.path.join(self.provider_folder, f"CAPIF-{file_name}-{id}-api.json") + + + + with open(output_path, "w") as outfile: + outfile.write(capif_response_text) + self.logger.info(f"CAPIF response saved to {output_path}") + output_path = os.path.join(self.provider_folder, "Published-Apis.json") + + # Leer el archivo existente de APIs publicados + published_apis = {} + if os.path.exists(output_path): + with open(output_path, "r") as outfile: + published_apis = json.load(outfile) + + # Agregar el nuevo API publicado + published_apis[file_name] = id + + # Escribir el archivo actualizado de APIs publicados + with open(output_path, "w") as outfile: + json.dump(published_apis, outfile, indent=4) + self.logger.info(f"API '{file_name}' with ID '{id}' added to Published Apis.") + return json.loads(capif_response_text) + + except requests.RequestException as e: + self.logger.error(f"Request to CAPIF failed: {e}") + raise + except Exception as e: + self.logger.error(f"Unexpected error during service publication: {e}") + raise + + + def unpublish_service(self) -> dict: + """ + Publishes services to CAPIF and returns the published services dictionary. + + :param service_api_description_json_full_path: The full path of the service_api_description.json containing + the endpoints to be published. + :return: The published services dictionary that was saved in CAPIF. + """ + self.logger.info("Starting the service unpublication process") + provider_details_path = os.path.join(self.provider_folder, "capif_provider_details.json") + self.logger.info(f"Loading provider details from {provider_details_path}") + + provider_details=self.__load_provider_api_details() + publish_url=provider_details["publish_url"] + + # Load provider details + json_path = self.config_path +"publish.json" + with open(json_path, 'r') as f: + publish = json.load(f) + api_id="/" + publish["serviceApiId"] + APF_api_prov_func_id=publish["publisherAPFid"] + AEFs_list = publish["publisherAEFsids"] + apf_number = None + for key, value in provider_details.items(): + if value == APF_api_prov_func_id and key.startswith("APF-"): + apf_inter = key.split("-")[1] + apf_number= apf_inter.split("_")[0] # Obtener el número del APF + break + + if apf_number is None: + self.logger.error(f"No matching APF found for publisherAPFid: {APF_api_prov_func_id}") + raise ValueError("Invalid publisherAPFid") + + + self.logger.info(f"Loading provider details from {provider_details_path}") + + url = f"{self.capif_https_url}{publish_url.replace('<apfId>', APF_api_prov_func_id)}{api_id}" + + cert = ( + os.path.join(self.provider_folder, f"apf-{apf_number}.crt"), + os.path.join(self.provider_folder, f"apf-{apf_number}_private_key.key"), + ) + + + self.logger.info(f"Unpublishing service to URL: {url}") + + try: + response = requests.delete( + url, + headers={"Content-Type": "application/json"}, + cert=cert, + verify=os.path.join(self.provider_folder, "ca.crt"), + ) + + response.raise_for_status() + + directory = self.provider_folder + + # Iterar sobre todos los archivos en el directorio + for filename in os.listdir(directory): + path = os.path.join(directory, filename) + + # Verificar si el archivo empieza con 'CAPIF-' + + if filename.startswith("CAPIF-") and publish["serviceApiId"] in filename: + + os.remove(path) # Salir del bucle si el archivo es eliminado + break + + output_path = os.path.join(self.provider_folder, "Published-Apis.json") + + # Leer el archivo existente de APIs publicados + published_apis = {} + if os.path.exists(output_path): + with open(output_path, "r") as outfile: + published_apis = json.load(outfile) + + # ID del API que deseas eliminar + api_id_to_delete = publish["serviceApiId"] # Reemplaza con el ID especÃfico + + # Buscar y eliminar el API por su ID + api_name_to_delete = None + for name, id in published_apis.items(): + if id == api_id_to_delete: + api_name_to_delete = name + break + + if api_name_to_delete: + del published_apis[api_name_to_delete] + self.logger.info(f"API with ID '{api_id_to_delete}' removed from Published Apis.") + else: + self.logger.warning(f"API with ID '{api_id_to_delete}' not found in Published Apis.") + + # Escribir el archivo actualizado de APIs publicados + with open(output_path, "w") as outfile: + json.dump(published_apis, outfile, indent=4) + + self.logger.info("Services unpublished successfully") + + + except requests.RequestException as e: + self.logger.error(f"Request to CAPIF failed: {e}") + raise + except Exception as e: + self.logger.error(f"Unexpected error during service unpublication: {e}") + raise + + def get_service(self) -> dict: + """ + Publishes services to CAPIF and returns the published services dictionary. + + :param service_api_description_json_full_path: The full path of the service_api_description.json containing + the endpoints to be published. + :return: The published services dictionary that was saved in CAPIF. + """ + self.logger.info("Starting the service unpublication process") + + provider_details_path = os.path.join(self.provider_folder, "capif_provider_details.json") + self.logger.info(f"Loading provider details from {provider_details_path}") + + provider_details=self.__load_provider_api_details() + publish_url=provider_details["publish_url"] + + json_path = self.config_path + "publish.json" + + + # Leer el archivo publish.json + with open(json_path, 'r') as f: + chosenAPFsandAEFs = json.load(f) + + APF_api_prov_func_id = chosenAPFsandAEFs["publisherAPFid"] + + api_id="/" +chosenAPFsandAEFs["serviceApiId"] + + apf_number = None + for key, value in provider_details.items(): + if value == APF_api_prov_func_id and key.startswith("APF-"): + apf_inter = key.split("-")[1] + apf_number= apf_inter.split("_")[0] # Obtener el número del APF + break + + if apf_number is None: + self.logger.error(f"No matching APF found for publisherAPFid: {APF_api_prov_func_id}") + raise ValueError("Invalid publisherAPFid") + + url = f"{self.capif_https_url}{publish_url.replace('<apfId>', APF_api_prov_func_id)}{api_id}" + + cert = ( + os.path.join(self.provider_folder, f"apf-{apf_number}.crt"), + os.path.join(self.provider_folder, f"apf-{apf_number}_private_key.key"), + ) + + + self.logger.info(f"Getting service to URL: {url}") + + try: + response = requests.get( + url, + headers={"Content-Type": "application/json"}, + cert=cert, + verify=os.path.join(self.provider_folder, "ca.crt"), + ) + + response.raise_for_status() + + self.logger.info("Service received successfully") + path=os.path.join(self.provider_folder,"service_received.json") + with open(path, 'w') as f: + json_data = json.loads(response.text) + json.dump(json_data,f,indent=4) + self.logger.info(f"Service saved in {path}") + + + + + except requests.RequestException as e: + self.logger.error(f"Request to CAPIF failed: {e}") + raise + except Exception as e: + self.logger.error(f"Unexpected error during service getter: {e}") + raise + + def get_all_services(self) -> dict: + """ + Publishes services to CAPIF and returns the published services dictionary. + + :param service_api_description_json_full_path: The full path of the service_api_description.json containing + the endpoints to be published. + :return: The published services dictionary that was saved in CAPIF. + """ + self.logger.info("Starting the service publication process") + + # Load provider details + provider_details_path = os.path.join(self.provider_folder, "capif_provider_details.json") + self.logger.info(f"Loading provider details from {provider_details_path}") + + provider_details=self.__load_provider_api_details() + publish_url=provider_details["publish_url"] + + json_path = self.config_path + "publish.json" + + + # Leer el archivo publish.json + with open(json_path, 'r') as f: + chosenAPFsandAEFs = json.load(f) + + APF_api_prov_func_id = chosenAPFsandAEFs["publisherAPFid"] + + + apf_number = None + for key, value in provider_details.items(): + if value == APF_api_prov_func_id and key.startswith("APF-"): + apf_inter = key.split("-")[1] + apf_number= apf_inter.split("_")[0] # Obtener el número del APF + break + + if apf_number is None: + self.logger.error(f"No matching APF found for publisherAPFid: {APF_api_prov_func_id}") + raise ValueError("Invalid publisherAPFid") + + # Leer y modificar la descripción de la API de servicio + + + # Publish services + url = f"{self.capif_https_url}{publish_url.replace('<apfId>', APF_api_prov_func_id)}" + cert = ( + os.path.join(self.provider_folder, f"apf-{apf_number}.crt"), + os.path.join(self.provider_folder, f"apf-{apf_number}_private_key.key"), + ) + + + self.logger.info(f"Getting services to URL: {url}") + + try: + response = requests.get( + url, + headers={"Content-Type": "application/json"}, + cert=cert, + verify=os.path.join(self.provider_folder, "ca.crt"), + ) + response.raise_for_status() + self.logger.info("Services received successfully") + + path=os.path.join(self.provider_folder,"service_received.json") + with open(path, 'w') as f: + json_data = json.loads(response.text) + json.dump(json_data,f,indent=4) + self.logger.info(f"Services saved in {path}") + + # Save response to file + + + + + except requests.RequestException as e: + self.logger.error(f"Request to CAPIF failed: {e}") + raise + except Exception as e: + self.logger.error(f"Unexpected error during services reception: {e}") + raise + + def update_service(self, service_api_description_json_full_path: str) -> dict: + """ + Publishes services to CAPIF and returns the published services dictionary. + + :param service_api_description_json_full_path: The full path of the service_api_description.json containing + the endpoints to be published. + :return: The published services dictionary that was saved in CAPIF. + """ + self.logger.info("Starting the service publication process") + + # Load provider details + # Load provider details + provider_details_path = os.path.join(self.provider_folder, "capif_provider_details.json") + self.logger.info(f"Loading provider details from {provider_details_path}") + + provider_details=self.__load_provider_api_details() + publish_url=provider_details["publish_url"] + + json_path = self.config_path + "publish.json" + + + # Leer el archivo publish.json + with open(json_path, 'r') as f: + chosenAPFsandAEFs = json.load(f) + + APF_api_prov_func_id = chosenAPFsandAEFs["publisherAPFid"] + AEFs_list = chosenAPFsandAEFs["publisherAEFsids"] + + apf_number = None + for key, value in provider_details.items(): + if value == APF_api_prov_func_id and key.startswith("APF-"): + apf_inter = key.split("-")[1] + apf_number= apf_inter.split("_")[0] # Obtener el número del APF + break + + if apf_number is None: + self.logger.error(f"No matching APF found for publisherAPFid: {APF_api_prov_func_id}") + raise ValueError("Invalid publisherAPFid") + + # Leer y modificar la descripción de la API de servicio + self.logger.info(f"Reading and modifying service API description from {service_api_description_json_full_path}") + + try: + with open(service_api_description_json_full_path, "r") as service_file: + data = json.load(service_file) + + # Verificamos que el número de AEFs coincide con el número de perfiles + if len(AEFs_list) != len(data.get("aefProfiles", [])): + self.logger.error("The number of AEFs in publisherAEFsids does not match the number of profiles in aefProfiles") + raise ValueError("Mismatch between number of AEFs and profiles") + + # Asignamos los AEFs correspondientes + for profile, aef_id in zip(data.get("aefProfiles", []), AEFs_list): + profile["aefId"] = aef_id + + self.logger.info("Service API description modified successfully") + + # Guardamos los cambios en el archivo + with open(service_api_description_json_full_path, "w") as service_file: + json.dump(data, service_file, indent=4) + + except FileNotFoundError: + self.logger.error(f"Service API description file not found: {service_api_description_json_full_path}") + raise + except json.JSONDecodeError as e: + self.logger.error(f"Error decoding JSON from file {service_api_description_json_full_path}: {e}") + raise + except ValueError as e: + self.logger.error(f"Error with the input data: {e}") + raise + api_id="/" +chosenAPFsandAEFs["serviceApiId"] + # Publish services + url = f"{self.capif_https_url}{publish_url.replace('<apfId>', APF_api_prov_func_id)}{api_id}" + cert = ( + os.path.join(self.provider_folder, f"apf-{apf_number}.crt"), + os.path.join(self.provider_folder, f"apf-{apf_number}_private_key.key"), + ) + + + self.logger.info(f"Publishing services to URL: {url}") + + try: + response = requests.put( + url, + headers={"Content-Type": "application/json"}, + data=json.dumps(data), + cert=cert, + verify=os.path.join(self.provider_folder, "ca.crt"), + ) + response.raise_for_status() + self.logger.info("Services updated successfully") + + # Save response to file + capif_response_text = response.text + + capif_response_json=json.loads(capif_response_text) + + file_name = capif_response_json.get("apiName", "default_name") # Default name if apiName is missing + id=capif_response_json.get("apiId","default_id") + output_path = os.path.join(self.provider_folder, f"CAPIF-{file_name}-{id}-api.json") + + + + with open(output_path, "w") as outfile: + outfile.write(capif_response_text) + self.logger.info(f"CAPIF response saved to {output_path}") + output_path = os.path.join(self.provider_folder, "Published-Apis.json") + + # Leer el archivo existente de APIs publicados + published_apis = {} + if os.path.exists(output_path): + with open(output_path, "r") as outfile: + published_apis = json.load(outfile) + + # Agregar el nuevo API publicado + published_apis[file_name] = id + + # Escribir el archivo actualizado de APIs publicados + with open(output_path, "w") as outfile: + json.dump(published_apis, outfile, indent=4) + self.logger.info(f"API '{file_name}' with ID '{id}' added to Published Apis.") + return json.loads(capif_response_text) + except requests.RequestException as e: + self.logger.error(f"Request to CAPIF failed: {e}") + raise + except Exception as e: + self.logger.error(f"Unexpected error during service publication: {e}") + raise + + def offboard_and_deregister_nef(self) -> None: + """ + Offboards and deregisters the NEF (Network Exposure Function). + """ + try: + self.offboard_nef() + self.__remove_files() + self.logger.info("Provider offboarded and deregistered successfully.") + except Exception as e: + self.logger.error(f"Failed to offboard and deregister Provider: {e}") + raise + + def offboard_nef(self) -> None: + """ + Offboards the NEF (Network Exposure Function) from CAPIF. + """ + try: + self.logger.info("Offboarding the provider") + + # Load CAPIF API details + capif_api_details = self.__load_provider_api_details() + url = f"{self.capif_https_url}api-provider-management/v1/registrations/{capif_api_details['capif_registration_id']}" + + # Define certificate paths + cert_paths = ( + os.path.join(self.provider_folder, "amf.crt"), + os.path.join(self.provider_folder, "AMF_private_key.key") + ) + + # Send DELETE request to offboard the provider + response = requests.delete( + url, + cert=cert_paths, + verify=os.path.join(self.provider_folder, "ca.crt") + ) + + response.raise_for_status() + self.logger.info("Offboarding performed successfully") + + except requests.exceptions.RequestException as e: + self.logger.error(f"Error offboarding NEF: {e}") + raise + except Exception as e: + self.logger.error(f"Unexpected error: {e}") + raise + + def __remove_files(self): + self.logger.info("Removing files generated") + try: + folder_path = self.provider_folder + + if os.path.exists(folder_path): + # Elimina todo el contenido dentro de la carpeta, incluyendo archivos y subcarpetas + for root, dirs, files in os.walk(folder_path): + for file in files: + os.remove(os.path.join(root, file)) + for dir in dirs: + shutil.rmtree(os.path.join(root, dir)) + os.rmdir(folder_path) + self.logger.info(f"All contents in {folder_path} removed successfully.") + else: + self.logger.warning(f"Folder {folder_path} does not exist.") + except Exception as e: + self.logger.error(f"Error during removing folder contents: {e}") + raise + + def __load_provider_api_details(self) -> dict: + """ + Loads NEF API details from the CAPIF provider details JSON file. + + :return: A dictionary containing NEF API details. + :raises FileNotFoundError: If the CAPIF provider details file is not found. + :raises json.JSONDecodeError: If there is an error decoding the JSON file. + """ + file_path = os.path.join(self.provider_folder, "capif_provider_details.json") + + try: + with open(file_path, "r") as file: + return json.load(file) + except FileNotFoundError: + self.logger.error(f"File not found: {file_path}") + raise + except json.JSONDecodeError as e: + self.logger.error(f"Error decoding JSON from file {file_path}: {e}") + raise + except Exception as e: + self.logger.error(f"Unexpected error while loading NEF API details: {e}") + raise + + + + +class ServiceDiscoverer: + class ServiceDiscovererException(Exception): + pass + + def __init__( + self, + config_file + ): + # Cargar configuración desde archivo si es necesario + config_file = os.path.abspath(config_file) + config = self.__load_config_file(config_file) + debug_mode = os.getenv('DEBUG_MODE', config.get('debug_mode', 'False')).strip().lower() + if debug_mode=="false": debug_mode=False + + # Inicializar logger + self.logger = logging.getLogger(self.__class__.__name__) + if debug_mode: + self.logger.setLevel(logging.DEBUG) + else: + self.logger.setLevel(logging.WARNING) + + + + + urllib_logger = logging.getLogger("urllib3") + if not debug_mode: + urllib_logger.setLevel(logging.WARNING) + else: + urllib_logger.setLevel(logging.DEBUG) + + self.config_path = os.path.dirname(config_file)+"/" + capif_host = os.getenv('CAPIF_HOST', config.get('capif_host', '')).strip() + capif_https_port = str(os.getenv('CAPIF_HTTPS_PORT', config.get('capif_https_port', '')).strip()) + invoker_general_folder = os.path.abspath(os.getenv('invoker_folder', config.get('invoker_folder', '')).strip()) + + capif_invoker_username = os.getenv('CAPIF_USERNAME', config.get('capif_username', '')).strip() + + + self.capif_invoker_username=capif_invoker_username + self.capif_host = capif_host + self.capif_https_port = capif_https_port + self.invoker_folder = os.path.join( + invoker_general_folder, capif_invoker_username + ) + os.makedirs(self.invoker_folder, exist_ok=True) + self.capif_api_details = self.__load_provider_api_details() + + self.signed_key_crt_path = os.path.join( + self.invoker_folder + ,self.capif_api_details["user_name"] + ".crt" + ) + self.private_key_path = os.path.join(self.invoker_folder ,"private.key") + self.ca_root_path = os.path.join(self.invoker_folder , "ca.crt") + + self.logger.info("ServiceDiscoverer initialized correctly") + + def get_api_provider_id(self): + return self.capif_api_details["api_provider_id"] + + def __load_config_file(self, config_file: str): + """Carga el archivo de configuración.""" + try: + with open(config_file, 'r') as file: + return json.load(file) + except FileNotFoundError: + self.logger.warning(f"Configuration file {config_file} not found. Using defaults or environment variables.") + return {} + + + def __load_provider_api_details(self): + try: + path=os.path.join(self.invoker_folder,"capif_api_security_context_details-"+self.capif_invoker_username+".json") + with open( + path, + "r", + ) as openfile: + details = json.load(openfile) + self.logger.info("Api provider details correctly loaded") + return details + except Exception as e: + self.logger.error("Error while loading Api invoker details: %s", str(e)) + raise + + def _add_trailing_slash_to_url_if_missing(self, url): + if not url.endswith("/"): + url += "/" + return url + + def get_security_context(self): + self.logger.info("Getting security context for all API's filtered") + + + + self.logger.info("Trying to update security context") + self.__update_security_service() + self.__cache_security_context() + + + def get_access_token(self): + """ + :param api_name: El nombre del API devuelto por descubrir servicios + :param api_id: El id del API devuelto por descubrir servicios + :param aef_id: El aef_id relevante devuelto por descubrir servicios + :return: El token de acceso (jwt) + """ + token_dic = self.__get_security_token() + self.logger.info("Access token successfully obtained") + return token_dic["access_token"] + + + + def __cache_security_context(self): + try: + path=os.path.join(self.invoker_folder,"capif_api_security_context_details-"+self.capif_invoker_username+".json") + with open( + path, "w" + ) as outfile: + json.dump(self.capif_api_details, outfile) + self.logger.info("Security context saved correctly") + except Exception as e: + self.logger.error("Error when saving the security context: %s", str(e)) + raise + + def __update_security_service(self): + """ + Actualiza el servicio de seguridad. + + :param api_id: El id del API devuelto por descubrir servicios. + :param aef_id: El aef_id devuelto por descubrir servicios. + :return: None. + """ + url = f"https://{self.capif_host}:{self.capif_https_port}/capif-security/v1/trustedInvokers/{self.capif_api_details['api_invoker_id']}/update" + payload = { + "securityInfo": [], + "notificationDestination": "https://mynotificationdest.com", + "requestTestNotification": True, + "websockNotifConfig": { + "websocketUri": "string", + "requestWebsocketUri": True + }, + "supportedFeatures": "fff" + } + + number_of_apis = len(self.capif_api_details["registered_security_contexes"]) + + for i in range(0, number_of_apis): + # Obteniendo los valores de api_id y aef_id para cada API + api_id = self.capif_api_details["registered_security_contexes"][i]["api_id"] + aef_id = self.capif_api_details["registered_security_contexes"][i]["aef_id"] + + security_info = { + "prefSecurityMethods": ["Oauth"], + "authenticationInfo": "string", + "authorizationInfo": "string", + "aefId": aef_id, + "apiId": api_id + } + + payload["securityInfo"].append(security_info) + + try: + response = requests.post(url, + json=payload, + cert=(self.signed_key_crt_path, self.private_key_path), + verify=self.ca_root_path) + response.raise_for_status() + self.logger.info("Security context correctly updated") + + except requests.exceptions.HTTPError as http_err: + if response.status_code == 404: + self.logger.warning("Received 404 error, redirecting to register security service") + self.__register_security_service() + else: + self.logger.error("HTTP error occurred: %s", str(http_err)) + raise + + except requests.RequestException as e: + self.logger.error("Error trying to update Security context: %s", str(e)) + raise + + + def __register_security_service(self): + """ + :param api_id: El id del API devuelto por descubrir servicios + :param aef_id: El aef_id devuelto por descubrir servicios + :return: None + """ + + url = f"https://{self.capif_host}:{self.capif_https_port}/capif-security/v1/trustedInvokers/{self.capif_api_details['api_invoker_id']}" + payload = { + "securityInfo": [], + "notificationDestination": "https://mynotificationdest.com", + "requestTestNotification": True, + "websockNotifConfig": { + "websocketUri": "string", + "requestWebsocketUri": True + }, + "supportedFeatures": "fff" + } + + number_of_apis = len(self.capif_api_details["registered_security_contexes"]) + + + + for i in range(0,number_of_apis): + # Obteniendo los valores de api_id y aef_id para cada API + api_id = self.capif_api_details["registered_security_contexes"][i]["api_id"] + aef_id = self.capif_api_details["registered_security_contexes"][i]["aef_id"] + + security_info = { + "prefSecurityMethods": ["Oauth"], + "authenticationInfo": "string", + "authorizationInfo": "string", + "aefId": aef_id, + "apiId": api_id + } + + payload["securityInfo"].append(security_info) + + + try: + response = requests.put(url, + json=payload, + cert=(self.signed_key_crt_path, self.private_key_path), + verify=self.ca_root_path + ) + response.raise_for_status() + self.logger.info("Security service properly registered") + except requests.RequestException as e: + self.logger.error("Error when registering the security service: %s", str(e)) + raise + + def __get_security_token(self): + """ + :param api_name: El nombre del API devuelto por descubrir servicios + :param aef_id: El aef_id relevante devuelto por descubrir servicios + :return: El token de acceso (jwt) + """ + url = f"https://{self.capif_host}:{self.capif_https_port}/capif-security/v1/securities/{self.capif_api_details['api_invoker_id']}/token" + # Construir el scope concatenando aef_id y api_name separados por un ';' + scope_parts = [] + + # Iterar sobre los contextos registrados y construir las partes del scope + for context in self.capif_api_details["registered_security_contexes"]: + aef_id = context["aef_id"] + api_name = context["api_name"] + scope_parts.append(f"{aef_id}:{api_name}") + + # Unir todas las partes del scope con ';' y añadir el prefijo '3gpp#' + scope = "3gpp#" + ";".join(scope_parts) + + + payload = { + "grant_type": "client_credentials", + "client_id": self.capif_api_details["api_invoker_id"], + "client_secret": "string", + "scope": scope + } + headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + } + + try: + response = requests.post(url, + headers=headers, + data=payload, + cert=(self.signed_key_crt_path, self.private_key_path), + verify=self.ca_root_path + ) + response.raise_for_status() + response_payload = response.json() + self.logger.info("Security token successfully obtained") + return response_payload + except requests.RequestException as e: + self.logger.error("Error obtaining the security token: %s", str(e)) + raise + + + def discover_service_apis(self): + """ + Descubre los APIs de servicio desde CAPIF con filtros basados en un archivo JSON. + :return: Payload JSON con los detalles de los APIs de servicio + """ + # Cargar los parámetros desde el archivo JSON + + json_path = self.config_path +"discover_filter.json" + with open(json_path, 'r') as f: + filters = json.load(f) + + # Filtrar parámetros que no sean vacÃos " + query_params = {k: v for k, v in filters.items() if v } + + # Formar la URL con los parámetros de query + query_string = "&".join([f"{k}={v}" for k, v in query_params.items()]) + + url = f"https://{self.capif_host}:{self.capif_https_port}/{self.capif_api_details['discover_services_url']}{self.capif_api_details['api_invoker_id']}" + + if query_string: + url += f"&{query_string}" + + try: + response = requests.get( + url, + headers={"Content-Type": "application/json"}, + cert=(self.signed_key_crt_path, self.private_key_path), + verify=self.ca_root_path + ) + + response.raise_for_status() + response_payload = response.json() + self.logger.info("Service APIs successfully discovered") + return response_payload + except requests.RequestException as e: + self.logger.error("Error discovering service APIs: %s", str(e)) + raise + + def retrieve_api_description_by_name(self, api_name): + """ + Recupera la descripción del API por nombre. + :param api_name: Nombre del API + :return: Descripción del API + """ + self.logger.info("Retrieving the API description for api_name=%s", api_name) + capif_apifs = self.discover_service_apis() + endpoints = [api for api in capif_apifs["serviceAPIDescriptions"] if api["apiName"] == api_name] + if not endpoints: + error_message = ( + f"Could not find available endpoints for api_name: {api_name}. " + "Make sure that a) your Invoker is registered and onboarded to CAPIF and " + "b) the NEF emulator has been registered and onboarded to CAPIF" + ) + self.logger.error(error_message) + raise ServiceDiscoverer.ServiceDiscovererException(error_message) + else: + self.logger.info("API description successfully retrieved") + return endpoints[0] + + def retrieve_specific_resource_name(self, api_name, resource_name): + """ + Recupera la URL para recursos especÃficos dentro de los APIs. + :param api_name: Nombre del API + :param resource_name: Nombre del recurso + :return: URL del recurso especÃfico + """ + self.logger.info("Retrieving the URL for resource_name=%s in api_name=%s", resource_name, api_name) + api_description = self.retrieve_api_description_by_name(api_name) + version_dictionary = api_description["aefProfiles"][0]["versions"][0] + version = version_dictionary["apiVersion"] + resources = version_dictionary["resources"] + uris = [resource["uri"] for resource in resources if resource["resourceName"] == resource_name] + + if not uris: + error_message = f"Could not find resource_name: {resource_name} at api_name {api_name}" + self.logger.error(error_message) + raise ServiceDiscoverer.ServiceDiscovererException(error_message) + else: + uri = uris[0] + if not uri.startswith("/"): + uri = "/" + uri + if api_name.endswith("/"): + api_name = api_name[:-1] + result_url = api_name + "/" + version + uri + self.logger.info("URL of the specific resource successfully retrieved: %s", result_url) + return result_url + + def save_security_token(self,token): + self.capif_api_details["access_token"]=token + self.__cache_security_context() + + def get_tokens(self): + + self.get_security_context() + token=self.get_access_token() + self.save_security_token(token) + + + def discover(self): + endpoints = self.discover_service_apis() + + if len(endpoints) > 0: + self.save_api_discovered(endpoints) + else: + self.logger.error("No endpoints have been registered. Make sure a Provider has Published an API to CAPIF first") + + def save_api_discovered(self,endpoints): + self.capif_api_details["registered_security_contexes"] = [] + for service in endpoints["serviceAPIDescriptions"]: + api_name = service["apiName"] + api_id = service["apiId"] + for n in service["aefProfiles"]: + aef_id=n["aefId"] + self.capif_api_details["registered_security_contexes"].append({"api_name":api_name,"api_id": api_id, "aef_id": aef_id}) + self.save_api_details() + + import json + + def save_api_details(self): + try: + # Define the path to save the details + file_path = os.path.join(self.invoker_folder , "capif_api_security_context_details-" + self.capif_invoker_username + ".json") + + # Save the details as a JSON file + with open(file_path, "w") as outfile: + json.dump(self.capif_api_details, outfile, indent=4) + + # Log the success of the operation + self.logger.info("API provider details correctly saved") + + except Exception as e: + # Log any errors that occur during the save process + self.logger.error("Error while saving API provider details: %s", str(e)) + raise + + + diff --git a/images/Flujo completo-OPENCAPIF ACTUAL.jpg b/images/Flujo completo-OPENCAPIF ACTUAL.jpg new file mode 100644 index 0000000000000000000000000000000000000000..960e4211c929c729994bde46ef81e5458ddb7e4e Binary files /dev/null and b/images/Flujo completo-OPENCAPIF ACTUAL.jpg differ diff --git a/images/Flujo completo-SDK ACTUAL CON REGISTER.jpg b/images/Flujo completo-SDK ACTUAL CON REGISTER.jpg new file mode 100644 index 0000000000000000000000000000000000000000..249dd95f7f508893e0f8976d053f4546af349178 Binary files /dev/null and b/images/Flujo completo-SDK ACTUAL CON REGISTER.jpg differ