Loading network_app_samples/network_app_provider_sample/network_app_provider.py +0 −329 Original line number Diff line number Diff line import threading from opencapif_sdk import capif_provider_connector from OpenSSL import crypto from flask_jwt_extended import jwt_required, JWTManager from flask import Flask, jsonify, request, redirect import json from werkzeug import serving capif_sdk_config_path = "./testing_CICD_capif_sdk_config_sample.json" resilience_app = Flask("resilience_app") jwt_flask = JWTManager(resilience_app) def config_resilience(): # JWT Configuration resilience_app.config['JWT_ALGORITHM'] = 'RS256' resilience_app.config['JWT_PUBLIC_KEY'] = "YourPublicKeyHere" with open("/sdk/certificates/capif_cert_server.pem", "rb") as cert_file: cert = cert_file.read() crtObj = crypto.load_certificate(crypto.FILETYPE_PEM, cert) pubKeyObject = crtObj.get_pubkey() pubKeyString = crypto.dump_publickey(crypto.FILETYPE_PEM, pubKeyObject) resilience_app.config['JWT_ALGORITHM'] = 'RS256' resilience_app.config['JWT_PUBLIC_KEY'] = pubKeyString # Dummy data for resilience management (In practice, this would connect to a database or service) resilience_profiles = { "1": {"profileId": 1, "status": "active"}, "2": {"profileId": 2, "status": "inactive"} } @resilience_app.route("/resilience", methods=["GET", "POST"]) @jwt_required() def manage_resilience(): """Endpoint to manage resilience functionalities in 6G networks""" if request.method == "GET": return jsonify({"message": "GET request for resilience management received."}) if request.method == "POST": data = request.get_json() return jsonify({"message": "Resilience management POST request received", "data": data}) @resilience_app.route("/resilience/<profileId>", methods=["GET", "PUT", "DELETE"]) @jwt_required() def manage_single_resilience(profileId): """Endpoint to manage a single resilience profile""" if request.method == "GET": profile = resilience_profiles.get(profileId, None) if profile: return jsonify({"profile": profile}) else: return jsonify({"message": f"Profile {profileId} not found"}), 404 if request.method == "PUT": data = request.get_json() resilience_profiles[profileId] = data return jsonify({"message": f"Profile {profileId} updated", "data": data}) if request.method == "DELETE": if profileId in resilience_profiles: del resilience_profiles[profileId] return jsonify({"message": f"Profile {profileId} deleted"}) else: return jsonify({"message": f"Profile {profileId} not found"}), 404 @resilience_app.route("/slice/resilience", methods=["GET", "POST"]) @jwt_required() def manage_slice_resilience(): """Endpoint for managing resilience in network slices""" if request.method == "GET": return jsonify({"message": "GET request for slice resilience received."}) if request.method == "POST": data = request.get_json() return jsonify({"message": "POST request for slice resilience management received", "data": data}) @resilience_app.route("/slice/<sliceId>/resilience", methods=["GET", "PUT", "DELETE"]) @jwt_required() def manage_single_slice_resilience(sliceId): """Endpoint for managing a single slice's resilience""" if request.method == "GET": return jsonify({"message": f"GET request for slice {sliceId} resilience received"}) if request.method == "PUT": data = request.get_json() return jsonify({"message": f"PUT request to update slice {sliceId}'s resilience", "data": data}) if request.method == "DELETE": return jsonify({"message": f"DELETE request to remove slice {sliceId}'s resilience"}) @resilience_app.route("/aef-security/v1/check-authentication", methods=["POST"]) @jwt_required() def custom_operation_check(): if request.method == "POST": try: # Extraer el JSON del cuerpo del request data = request.get_json() # Ejemplo de lógica para determinar si redirigir # Este es un ejemplo ficticio, deberías reemplazarlo con la lógica real de tu caso if "redirect_temporary" in data: # Si se detecta "redirect_temporary", enviamos un 307 con un header Location return redirect("https://alternative-uri.example.com", code=307) elif "redirect_permanent" in data: # Si se detecta "redirect_permanent", enviamos un 308 con un header Location return redirect("https://alternative-uri.example.com", code=308) # Si no hay redirección, devolvemos el 200 OK con los datos return jsonify(data), 200 except Exception as e: print("Error:", str(e)) return jsonify({"error": str(e)}), 500 Network_app = Flask("Network_app") # JWT Configuration jwt_flask = JWTManager(Network_app) def config_network(): with open("/sdk/certificates/capif_cert_server.pem", "rb") as cert_file: cert = cert_file.read() crtObj = crypto.load_certificate(crypto.FILETYPE_PEM, cert) pubKeyObject = crtObj.get_pubkey() pubKeyString = crypto.dump_publickey(crypto.FILETYPE_PEM, pubKeyObject) Network_app.config['JWT_ALGORITHM'] = 'RS256' Network_app.config['JWT_PUBLIC_KEY'] = pubKeyString # Dummy data for QoS, TSN, and Slicing management qos_profiles = { "1": {"profileId": 1, "level": "high"}, "2": {"profileId": 2, "level": "medium"} } tsn_profiles = { "profile1": {"name": "profile1", "config": "default"}, "profile2": {"name": "profile2", "config": "custom"} } network_slices = { "slice1": {"sliceId": "slice1", "status": "active"}, "slice2": {"sliceId": "slice2", "status": "inactive"} } # QoS Management Endpoints ### @Network_app.route("/<scsAsId>/qos", methods=["GET", "POST"]) @jwt_required() def manage_qos(scsAsId): """Endpoint to manage QoS levels for network traffic""" if request.method == "GET": return jsonify({"message": "GET request for QoS management", "scsAsId": scsAsId}) if request.method == "POST": data = request.get_json() return jsonify({"message": "POST request to manage QoS", "scsAsId": scsAsId, "data": data}) @Network_app.route("/<scsAsId>/qos/<profileId>", methods=["GET", "PUT", "DELETE"]) @jwt_required() def manage_single_qos_profile(scsAsId, profileId): """Endpoint to manage single QoS profile""" if request.method == "GET": profile = qos_profiles.get(profileId) if profile: return jsonify({"profile": profile, "scsAsId": scsAsId}) return jsonify({"message": f"Profile {profileId} not found"}), 404 if request.method == "PUT": data = request.get_json() qos_profiles[profileId] = data return jsonify({"message": f"Profile {profileId} updated", "scsAsId": scsAsId, "data": data}) if request.method == "DELETE": if profileId in qos_profiles: del qos_profiles[profileId] return jsonify({"message": f"Profile {profileId} deleted"}) return jsonify({"message": f"Profile {profileId} not found"}), 404 # TSN Management Endpoints ### @Network_app.route("/profile", methods=["GET"]) @jwt_required() def list_tsn_profiles(): """Endpoint for retrieving the list of available TSN profiles""" return jsonify({"profiles": tsn_profiles}) @Network_app.route("/profile", methods=["GET"]) @jwt_required() def tsn_profile_by_name(): """Endpoint for retrieving information about a single TSN profile""" profile_name = request.args.get('name') profile = tsn_profiles.get(profile_name) if profile: return jsonify({"profile": profile}) return jsonify({"message": f"Profile {profile_name} not found"}), 404 @Network_app.route("/Network_apply", methods=["POST"]) @jwt_required() def Network_apply_tsn_configuration(): """Endpoint for configuring TSN connection parameters""" data = request.get_json() return jsonify({"message": "TSN configuration Network_applied", "data": data}) @Network_app.route("/clear", methods=["POST"]) @jwt_required() def clear_tsn_configuration(): """Endpoint for removing a previous TSN connection configuration""" return jsonify({"message": "TSN configuration cleared"}) # Network Slicing Management Endpoints ### @Network_app.route("/slice", methods=["GET", "POST"]) @jwt_required() def manage_slices(): """Endpoint for managing network slices""" if request.method == "GET": return jsonify({"slices": network_slices}) if request.method == "POST": data = request.get_json() slice_id = f"slice{len(network_slices) + 1}" network_slices[slice_id] = data return jsonify({"message": "Network slice added", "sliceId": slice_id, "data": data}) @Network_app.route("/aef-security/v1/check-authentication", methods=["POST"]) @jwt_required() def custom_operation_check_2(): if request.method == "POST": try: # Extraer el JSON del cuerpo del request data = request.get_json() # Ejemplo de lógica para determinar si redirigir # Este es un ejemplo ficticio, deberías reemplazarlo con la lógica real de tu caso if "redirect_temporary" in data: # Si se detecta "redirect_temporary", enviamos un 307 con un header Location return redirect("https://alternative-uri.example.com", code=307) elif "redirect_permanent" in data: # Si se detecta "redirect_permanent", enviamos un 308 con un header Location return redirect("https://alternative-uri.example.com", code=308) # Si no hay redirección, devolvemos el 200 OK con los datos return jsonify(data), 200 except Exception as e: print("Error:", str(e)) return jsonify({"error": str(e)}), 500 @Network_app.route("/slice/<sliceId>", methods=["GET", "PUT", "DELETE"]) @jwt_required() def manage_single_slice(sliceId): """Endpoint for managing a single network slice""" if request.method == "GET": slice_ = network_slices.get(sliceId) if slice_: return jsonify({"slice": slice_}) return jsonify({"message": f"Slice {sliceId} not found"}), 404 if request.method == "PUT": data = request.get_json() network_slices[sliceId] = data return jsonify({"message": f"Slice {sliceId} updated", "data": data}) if request.method == "DELETE": if sliceId in network_slices: del network_slices[sliceId] return jsonify({"message": f"Slice {sliceId} deleted"}) return jsonify({"message": f"Slice {sliceId} not found"}), 404 def run_resilience_app(): serving.run_simple("0.0.0.0", 8088, resilience_app) def run_network_app(): serving.run_simple("0.0.0.0", 8888, Network_app) if __name__ == "__main__": try: # Initialize the connector Loading @@ -326,37 +26,8 @@ if __name__ == "__main__": capif_connector.publish_services() # capif_connector.api_description_path = "./nef_upf_vendor_2.json" # capif_connector.publish_req['publisher_apf_id'] = api5g # capif_connector.publish_req['publisher_aefs_ids'] = [Peñuelas, Distrito, Valladolid] # capif_connector.publish_services() # capif_connector.publish_req['service_api_id'] = capif_connector.provider_capif_ids['5G-Network-App-API'] # capif_connector.update_service() print("API PUBLISHED") # capif_connector.publish_req['service_api_id'] = capif_connector.provider_service_ids['5G-Network-App-API'] # capif_connector.get_service() # config_network() # config_resilience() # t1 = threading.Thread(target=run_resilience_app) # t2 = threading.Thread(target=run_network_app) # t1.start() # t2.start() # t1.join() # t2.join() # print("SERVERS WORKING") except FileNotFoundError as e: print(f"Error: {e}") except json.JSONDecodeError as e: Loading Loading
network_app_samples/network_app_provider_sample/network_app_provider.py +0 −329 Original line number Diff line number Diff line import threading from opencapif_sdk import capif_provider_connector from OpenSSL import crypto from flask_jwt_extended import jwt_required, JWTManager from flask import Flask, jsonify, request, redirect import json from werkzeug import serving capif_sdk_config_path = "./testing_CICD_capif_sdk_config_sample.json" resilience_app = Flask("resilience_app") jwt_flask = JWTManager(resilience_app) def config_resilience(): # JWT Configuration resilience_app.config['JWT_ALGORITHM'] = 'RS256' resilience_app.config['JWT_PUBLIC_KEY'] = "YourPublicKeyHere" with open("/sdk/certificates/capif_cert_server.pem", "rb") as cert_file: cert = cert_file.read() crtObj = crypto.load_certificate(crypto.FILETYPE_PEM, cert) pubKeyObject = crtObj.get_pubkey() pubKeyString = crypto.dump_publickey(crypto.FILETYPE_PEM, pubKeyObject) resilience_app.config['JWT_ALGORITHM'] = 'RS256' resilience_app.config['JWT_PUBLIC_KEY'] = pubKeyString # Dummy data for resilience management (In practice, this would connect to a database or service) resilience_profiles = { "1": {"profileId": 1, "status": "active"}, "2": {"profileId": 2, "status": "inactive"} } @resilience_app.route("/resilience", methods=["GET", "POST"]) @jwt_required() def manage_resilience(): """Endpoint to manage resilience functionalities in 6G networks""" if request.method == "GET": return jsonify({"message": "GET request for resilience management received."}) if request.method == "POST": data = request.get_json() return jsonify({"message": "Resilience management POST request received", "data": data}) @resilience_app.route("/resilience/<profileId>", methods=["GET", "PUT", "DELETE"]) @jwt_required() def manage_single_resilience(profileId): """Endpoint to manage a single resilience profile""" if request.method == "GET": profile = resilience_profiles.get(profileId, None) if profile: return jsonify({"profile": profile}) else: return jsonify({"message": f"Profile {profileId} not found"}), 404 if request.method == "PUT": data = request.get_json() resilience_profiles[profileId] = data return jsonify({"message": f"Profile {profileId} updated", "data": data}) if request.method == "DELETE": if profileId in resilience_profiles: del resilience_profiles[profileId] return jsonify({"message": f"Profile {profileId} deleted"}) else: return jsonify({"message": f"Profile {profileId} not found"}), 404 @resilience_app.route("/slice/resilience", methods=["GET", "POST"]) @jwt_required() def manage_slice_resilience(): """Endpoint for managing resilience in network slices""" if request.method == "GET": return jsonify({"message": "GET request for slice resilience received."}) if request.method == "POST": data = request.get_json() return jsonify({"message": "POST request for slice resilience management received", "data": data}) @resilience_app.route("/slice/<sliceId>/resilience", methods=["GET", "PUT", "DELETE"]) @jwt_required() def manage_single_slice_resilience(sliceId): """Endpoint for managing a single slice's resilience""" if request.method == "GET": return jsonify({"message": f"GET request for slice {sliceId} resilience received"}) if request.method == "PUT": data = request.get_json() return jsonify({"message": f"PUT request to update slice {sliceId}'s resilience", "data": data}) if request.method == "DELETE": return jsonify({"message": f"DELETE request to remove slice {sliceId}'s resilience"}) @resilience_app.route("/aef-security/v1/check-authentication", methods=["POST"]) @jwt_required() def custom_operation_check(): if request.method == "POST": try: # Extraer el JSON del cuerpo del request data = request.get_json() # Ejemplo de lógica para determinar si redirigir # Este es un ejemplo ficticio, deberías reemplazarlo con la lógica real de tu caso if "redirect_temporary" in data: # Si se detecta "redirect_temporary", enviamos un 307 con un header Location return redirect("https://alternative-uri.example.com", code=307) elif "redirect_permanent" in data: # Si se detecta "redirect_permanent", enviamos un 308 con un header Location return redirect("https://alternative-uri.example.com", code=308) # Si no hay redirección, devolvemos el 200 OK con los datos return jsonify(data), 200 except Exception as e: print("Error:", str(e)) return jsonify({"error": str(e)}), 500 Network_app = Flask("Network_app") # JWT Configuration jwt_flask = JWTManager(Network_app) def config_network(): with open("/sdk/certificates/capif_cert_server.pem", "rb") as cert_file: cert = cert_file.read() crtObj = crypto.load_certificate(crypto.FILETYPE_PEM, cert) pubKeyObject = crtObj.get_pubkey() pubKeyString = crypto.dump_publickey(crypto.FILETYPE_PEM, pubKeyObject) Network_app.config['JWT_ALGORITHM'] = 'RS256' Network_app.config['JWT_PUBLIC_KEY'] = pubKeyString # Dummy data for QoS, TSN, and Slicing management qos_profiles = { "1": {"profileId": 1, "level": "high"}, "2": {"profileId": 2, "level": "medium"} } tsn_profiles = { "profile1": {"name": "profile1", "config": "default"}, "profile2": {"name": "profile2", "config": "custom"} } network_slices = { "slice1": {"sliceId": "slice1", "status": "active"}, "slice2": {"sliceId": "slice2", "status": "inactive"} } # QoS Management Endpoints ### @Network_app.route("/<scsAsId>/qos", methods=["GET", "POST"]) @jwt_required() def manage_qos(scsAsId): """Endpoint to manage QoS levels for network traffic""" if request.method == "GET": return jsonify({"message": "GET request for QoS management", "scsAsId": scsAsId}) if request.method == "POST": data = request.get_json() return jsonify({"message": "POST request to manage QoS", "scsAsId": scsAsId, "data": data}) @Network_app.route("/<scsAsId>/qos/<profileId>", methods=["GET", "PUT", "DELETE"]) @jwt_required() def manage_single_qos_profile(scsAsId, profileId): """Endpoint to manage single QoS profile""" if request.method == "GET": profile = qos_profiles.get(profileId) if profile: return jsonify({"profile": profile, "scsAsId": scsAsId}) return jsonify({"message": f"Profile {profileId} not found"}), 404 if request.method == "PUT": data = request.get_json() qos_profiles[profileId] = data return jsonify({"message": f"Profile {profileId} updated", "scsAsId": scsAsId, "data": data}) if request.method == "DELETE": if profileId in qos_profiles: del qos_profiles[profileId] return jsonify({"message": f"Profile {profileId} deleted"}) return jsonify({"message": f"Profile {profileId} not found"}), 404 # TSN Management Endpoints ### @Network_app.route("/profile", methods=["GET"]) @jwt_required() def list_tsn_profiles(): """Endpoint for retrieving the list of available TSN profiles""" return jsonify({"profiles": tsn_profiles}) @Network_app.route("/profile", methods=["GET"]) @jwt_required() def tsn_profile_by_name(): """Endpoint for retrieving information about a single TSN profile""" profile_name = request.args.get('name') profile = tsn_profiles.get(profile_name) if profile: return jsonify({"profile": profile}) return jsonify({"message": f"Profile {profile_name} not found"}), 404 @Network_app.route("/Network_apply", methods=["POST"]) @jwt_required() def Network_apply_tsn_configuration(): """Endpoint for configuring TSN connection parameters""" data = request.get_json() return jsonify({"message": "TSN configuration Network_applied", "data": data}) @Network_app.route("/clear", methods=["POST"]) @jwt_required() def clear_tsn_configuration(): """Endpoint for removing a previous TSN connection configuration""" return jsonify({"message": "TSN configuration cleared"}) # Network Slicing Management Endpoints ### @Network_app.route("/slice", methods=["GET", "POST"]) @jwt_required() def manage_slices(): """Endpoint for managing network slices""" if request.method == "GET": return jsonify({"slices": network_slices}) if request.method == "POST": data = request.get_json() slice_id = f"slice{len(network_slices) + 1}" network_slices[slice_id] = data return jsonify({"message": "Network slice added", "sliceId": slice_id, "data": data}) @Network_app.route("/aef-security/v1/check-authentication", methods=["POST"]) @jwt_required() def custom_operation_check_2(): if request.method == "POST": try: # Extraer el JSON del cuerpo del request data = request.get_json() # Ejemplo de lógica para determinar si redirigir # Este es un ejemplo ficticio, deberías reemplazarlo con la lógica real de tu caso if "redirect_temporary" in data: # Si se detecta "redirect_temporary", enviamos un 307 con un header Location return redirect("https://alternative-uri.example.com", code=307) elif "redirect_permanent" in data: # Si se detecta "redirect_permanent", enviamos un 308 con un header Location return redirect("https://alternative-uri.example.com", code=308) # Si no hay redirección, devolvemos el 200 OK con los datos return jsonify(data), 200 except Exception as e: print("Error:", str(e)) return jsonify({"error": str(e)}), 500 @Network_app.route("/slice/<sliceId>", methods=["GET", "PUT", "DELETE"]) @jwt_required() def manage_single_slice(sliceId): """Endpoint for managing a single network slice""" if request.method == "GET": slice_ = network_slices.get(sliceId) if slice_: return jsonify({"slice": slice_}) return jsonify({"message": f"Slice {sliceId} not found"}), 404 if request.method == "PUT": data = request.get_json() network_slices[sliceId] = data return jsonify({"message": f"Slice {sliceId} updated", "data": data}) if request.method == "DELETE": if sliceId in network_slices: del network_slices[sliceId] return jsonify({"message": f"Slice {sliceId} deleted"}) return jsonify({"message": f"Slice {sliceId} not found"}), 404 def run_resilience_app(): serving.run_simple("0.0.0.0", 8088, resilience_app) def run_network_app(): serving.run_simple("0.0.0.0", 8888, Network_app) if __name__ == "__main__": try: # Initialize the connector Loading @@ -326,37 +26,8 @@ if __name__ == "__main__": capif_connector.publish_services() # capif_connector.api_description_path = "./nef_upf_vendor_2.json" # capif_connector.publish_req['publisher_apf_id'] = api5g # capif_connector.publish_req['publisher_aefs_ids'] = [Peñuelas, Distrito, Valladolid] # capif_connector.publish_services() # capif_connector.publish_req['service_api_id'] = capif_connector.provider_capif_ids['5G-Network-App-API'] # capif_connector.update_service() print("API PUBLISHED") # capif_connector.publish_req['service_api_id'] = capif_connector.provider_service_ids['5G-Network-App-API'] # capif_connector.get_service() # config_network() # config_resilience() # t1 = threading.Thread(target=run_resilience_app) # t2 = threading.Thread(target=run_network_app) # t1.start() # t2.start() # t1.join() # t2.join() # print("SERVERS WORKING") except FileNotFoundError as e: print(f"Error: {e}") except json.JSONDecodeError as e: Loading