Loading one_provider_gui/service/demo_api_one.json→one_provider_gui/service/demo_api_oauth.json +4 −4 Original line number Diff line number Diff line Loading @@ -10,7 +10,7 @@ "expiry": "2021-11-30T10:32:02.004Z", "resources": [ { "resourceName": "create-vm-endpoint", "resourceName": "Hello endpoint", "commType": "REQUEST_RESPONSE", "uri": "/hello", "custOpName": "string", Loading @@ -34,18 +34,18 @@ ], "protocol": "HTTP_1_1", "dataFormat": "JSON", "securityMethods": ["Oauth"], "securityMethods": ["OAUTH"], "interfaceDescriptions": [ { "ipv4Addr": "one_provider_gui", "port": 8088, "securityMethods": ["Oauth"] "securityMethods": ["OAUTH"] } ] } ], "description": "Hello api services", "supportedFeatures": "fffff", "supportedFeatures": "020", "shareableInfo": { "isShareable": true, "capifProvDoms": [ Loading one_provider_gui/service/events_destination.py +1 −1 Original line number Diff line number Diff line Loading @@ -5,7 +5,7 @@ from termcolor import colored app = Flask(__name__) @app.route('/', methods=['POST']) def recibir_solicitud(): def receive_request(): try: response = request.get_json() Loading one_provider_gui/service/service_oauth.py +61 −4 Original line number Diff line number Diff line from flask import Flask, jsonify, request from flask import Flask, jsonify, request, current_app from flask_jwt_extended import jwt_required, JWTManager, get_jwt_identity, get_jwt import ssl from werkzeug import serving Loading @@ -6,11 +6,13 @@ import socket, ssl import OpenSSL from OpenSSL import crypto import jwt import json app = Flask(__name__) jwt_flask = JWTManager(app) # Load the public key from the certificate file to verify JWT tokens with open("../capif_ops/certs/cert_server.pem", "rb") as cert_file: cert= cert_file.read() Loading @@ -22,16 +24,71 @@ pubKeyString = crypto.dump_publickey(crypto.FILETYPE_PEM,pubKeyObject) app.config['JWT_ALGORITHM'] = 'RS256' app.config['JWT_PUBLIC_KEY'] = pubKeyString # Load the aef_id from the configuration file and the api name try: with open("../capif_ops/config_files/demo_values.json", "r") as f: config_data = json.load(f) if "aef_id" not in config_data: raise ValueError("Configuration file is missing 'aef_id'.") aef_id = config_data.get("aef_id", "default_aef_id") api_name = "demo_api_OCF" print(f"Using aef_id: {aef_id} and api_name: {api_name}") except FileNotFoundError: config_data = {} print("File ../capif_ops/config_files/demo_values.json dont found.") # Function to check the validity of the OAuth token and extract the invoker id def __check_oauth_token__(): """ Check if the scope is valid and return the Invoker id """ claims = get_jwt() invoker_id = claims.get('iss', None) scope = claims.get('scope', None) res_owner_id = claims.get('res_owner_id', None) current_app.logger.debug(f"invoker_id: {invoker_id}, scope: {scope}, res_owner_id: {res_owner_id}") if not invoker_id or not scope: raise Exception("Invalid token: missing invoker_id or scope") if not scope.startswith("3gpp#"): raise Exception("Invalid scope format: must start with '3gpp#'") scope_body = scope[len("3gpp#"):] aef_entries = scope_body.split(";") for entry in aef_entries: scope_aef_id, apis = entry.split(":", 1) scope_api_list = [api.strip() for api in apis.split(",")] if scope_aef_id == aef_id and api_name in scope_api_list: return invoker_id raise Exception("Invalid token: scope does not match the required aef_id and api_name") # Route to handle the hello request @app.route("/hello", methods=["POST"]) @jwt_required() def hello(): try: invoker_id = __check_oauth_token__() request_data = request.get_json() user_name = request_data['name'] return jsonify(f"Welcome to OpenCAPIF {user_name}!") return jsonify(f"Welcome to OpenCAPIF {user_name} ({invoker_id})!") except Exception as e: return jsonify({"error": str(e)}), 400 if __name__ == '__main__': Loading Loading
one_provider_gui/service/demo_api_one.json→one_provider_gui/service/demo_api_oauth.json +4 −4 Original line number Diff line number Diff line Loading @@ -10,7 +10,7 @@ "expiry": "2021-11-30T10:32:02.004Z", "resources": [ { "resourceName": "create-vm-endpoint", "resourceName": "Hello endpoint", "commType": "REQUEST_RESPONSE", "uri": "/hello", "custOpName": "string", Loading @@ -34,18 +34,18 @@ ], "protocol": "HTTP_1_1", "dataFormat": "JSON", "securityMethods": ["Oauth"], "securityMethods": ["OAUTH"], "interfaceDescriptions": [ { "ipv4Addr": "one_provider_gui", "port": 8088, "securityMethods": ["Oauth"] "securityMethods": ["OAUTH"] } ] } ], "description": "Hello api services", "supportedFeatures": "fffff", "supportedFeatures": "020", "shareableInfo": { "isShareable": true, "capifProvDoms": [ Loading
one_provider_gui/service/events_destination.py +1 −1 Original line number Diff line number Diff line Loading @@ -5,7 +5,7 @@ from termcolor import colored app = Flask(__name__) @app.route('/', methods=['POST']) def recibir_solicitud(): def receive_request(): try: response = request.get_json() Loading
one_provider_gui/service/service_oauth.py +61 −4 Original line number Diff line number Diff line from flask import Flask, jsonify, request from flask import Flask, jsonify, request, current_app from flask_jwt_extended import jwt_required, JWTManager, get_jwt_identity, get_jwt import ssl from werkzeug import serving Loading @@ -6,11 +6,13 @@ import socket, ssl import OpenSSL from OpenSSL import crypto import jwt import json app = Flask(__name__) jwt_flask = JWTManager(app) # Load the public key from the certificate file to verify JWT tokens with open("../capif_ops/certs/cert_server.pem", "rb") as cert_file: cert= cert_file.read() Loading @@ -22,16 +24,71 @@ pubKeyString = crypto.dump_publickey(crypto.FILETYPE_PEM,pubKeyObject) app.config['JWT_ALGORITHM'] = 'RS256' app.config['JWT_PUBLIC_KEY'] = pubKeyString # Load the aef_id from the configuration file and the api name try: with open("../capif_ops/config_files/demo_values.json", "r") as f: config_data = json.load(f) if "aef_id" not in config_data: raise ValueError("Configuration file is missing 'aef_id'.") aef_id = config_data.get("aef_id", "default_aef_id") api_name = "demo_api_OCF" print(f"Using aef_id: {aef_id} and api_name: {api_name}") except FileNotFoundError: config_data = {} print("File ../capif_ops/config_files/demo_values.json dont found.") # Function to check the validity of the OAuth token and extract the invoker id def __check_oauth_token__(): """ Check if the scope is valid and return the Invoker id """ claims = get_jwt() invoker_id = claims.get('iss', None) scope = claims.get('scope', None) res_owner_id = claims.get('res_owner_id', None) current_app.logger.debug(f"invoker_id: {invoker_id}, scope: {scope}, res_owner_id: {res_owner_id}") if not invoker_id or not scope: raise Exception("Invalid token: missing invoker_id or scope") if not scope.startswith("3gpp#"): raise Exception("Invalid scope format: must start with '3gpp#'") scope_body = scope[len("3gpp#"):] aef_entries = scope_body.split(";") for entry in aef_entries: scope_aef_id, apis = entry.split(":", 1) scope_api_list = [api.strip() for api in apis.split(",")] if scope_aef_id == aef_id and api_name in scope_api_list: return invoker_id raise Exception("Invalid token: scope does not match the required aef_id and api_name") # Route to handle the hello request @app.route("/hello", methods=["POST"]) @jwt_required() def hello(): try: invoker_id = __check_oauth_token__() request_data = request.get_json() user_name = request_data['name'] return jsonify(f"Welcome to OpenCAPIF {user_name}!") return jsonify(f"Welcome to OpenCAPIF {user_name} ({invoker_id})!") except Exception as e: return jsonify({"error": str(e)}), 400 if __name__ == '__main__': Loading