Loading services/register/config.yaml +2 −0 Original line number Original line Diff line number Diff line Loading @@ -14,5 +14,7 @@ ca_factory: { register: { register: { register_uuid: '6ba7b810-9dad-11d1-80b4-00c04fd430c8', register_uuid: '6ba7b810-9dad-11d1-80b4-00c04fd430c8', refresh_expiration: 30, #days token_expiration: 10, #mins admin_users: {admin: "password123"} admin_users: {admin: "password123"} } } No newline at end of file services/register/register_service/__main__.py +5 −2 Original line number Original line Diff line number Diff line Loading @@ -6,11 +6,12 @@ from flask_jwt_extended import JWTManager from OpenSSL.crypto import PKey, TYPE_RSA, X509Req, dump_certificate_request, FILETYPE_PEM, dump_privatekey from OpenSSL.crypto import PKey, TYPE_RSA, X509Req, dump_certificate_request, FILETYPE_PEM, dump_privatekey import requests import requests import json import json import jwt from .config import Config from .config import Config app = Flask(__name__) app = Flask(__name__) jwt = JWTManager(app) jwt_manager = JWTManager(app) config = Config().get_config() config = Config().get_config() Loading Loading @@ -65,15 +66,17 @@ cert_file = open("register_service/certs/ca_root.crt", 'wb') cert_file.write(bytes(ca_root, 'utf-8')) cert_file.write(bytes(ca_root, 'utf-8')) cert_file.close() cert_file.close() # Request CAPIF private key to encode the token # Request CAPIF private key to encode the CAPIF token url = 'http://{}:{}/v1/secret/data/server_cert/private'.format(config["ca_factory"]["url"], config["ca_factory"]["port"]) url = 'http://{}:{}/v1/secret/data/server_cert/private'.format(config["ca_factory"]["url"], config["ca_factory"]["port"]) headers = {'X-Vault-Token': f"{config["ca_factory"]["token"]}"} headers = {'X-Vault-Token': f"{config["ca_factory"]["token"]}"} response = requests.request("GET", url, headers=headers, verify = False) response = requests.request("GET", url, headers=headers, verify = False) key_data = json.loads(response.text)["data"]["data"]["key"] key_data = json.loads(response.text)["data"]["data"]["key"] app.config['JWT_ALGORITHM'] = 'RS256' app.config['JWT_ALGORITHM'] = 'RS256' app.config['JWT_PRIVATE_KEY'] = key_data app.config['JWT_PRIVATE_KEY'] = key_data app.config['REGISTRE_SECRET_KEY'] = config["register"]["register_uuid"] app.register_blueprint(register_routes) app.register_blueprint(register_routes) Loading services/register/register_service/controllers/register_controller.py +70 −28 Original line number Original line Diff line number Diff line Loading @@ -3,13 +3,34 @@ from flask import current_app, Flask, jsonify, request, Blueprint from flask import current_app, Flask, jsonify, request, Blueprint from ..core.register_operations import RegisterOperations from ..core.register_operations import RegisterOperations from ..config import Config from ..config import Config from functools import wraps from datetime import datetime, timedelta from flask_httpauth import HTTPBasicAuth from flask_httpauth import HTTPBasicAuth import jwt auth = HTTPBasicAuth() auth = HTTPBasicAuth() config = Config().get_config() config = Config().get_config() register_routes = Blueprint("register_routes", __name__) register_operation = RegisterOperations() # Function to generate access tokens and refresh tokens def generate_tokens(username): access_payload = { 'username': username, 'exp': datetime.now() + timedelta(minutes=config["register"]["token_expiration"]) } refresh_payload = { 'username': username, 'exp': datetime.now() + timedelta(days=config["register"]["refresh_expiration"]) } access_token = jwt.encode(access_payload, current_app.config['REGISTRE_SECRET_KEY'], algorithm='HS256') refresh_token = jwt.encode(refresh_payload, current_app.config['REGISTRE_SECRET_KEY'], algorithm='HS256') return access_token, refresh_token # Function in charge of verifying the basic auth @auth.verify_password @auth.verify_password def verify_password(username, password): def verify_password(username, password): users = register_operation.get_users()[0].json["users"] users = register_operation.get_users()[0].json["users"] Loading @@ -19,23 +40,50 @@ def verify_password(username, password): if user["username"] == username and user["password"]==password: if user["username"] == username and user["password"]==password: return username, "client" return username, "client" def admin_required(fn): # Function responsible for verifying the token def wrapper(*args, **kwargs): def admin_required(): username, role = auth.current_user() def decorator(f): if role == 'admin': @wraps(f) return fn(*args, **kwargs) def decorated(*args, **kwargs): else: return jsonify(message="Access denied. Administrator privileges required."), 403 wrapper.__name__ = fn.__name__ return wrapper register_routes = Blueprint("register_routes", __name__) token = request.headers.get('Authorization') register_operation = RegisterOperations() if not token: return jsonify({'message': 'Token is missing'}), 401 @register_routes.route("/createUser", methods=["POST"]) if token.startswith('Bearer '): token = token.split('Bearer ')[1] if not token: return jsonify({'message': 'Token is missing'}), 401 try: data = jwt.decode(token, current_app.config['REGISTRE_SECRET_KEY'], algorithms=['HS256'], options={'verify_exp': True}) username = data['username'] return f(username, *args, **kwargs) except Exception as e: return jsonify({'message': str(e)}), 401 return decorated return decorator @register_routes.route('/login', methods=['POST']) @auth.login_required @auth.login_required @admin_required def login(): def register(): username, rol = auth.current_user() if rol != "admin": return jsonify(message="Unauthorized. Administrator privileges required."), 401 access_token, refresh_token = generate_tokens(username) return jsonify({'access_token': access_token, 'refresh_token': refresh_token}) @register_routes.route('/refresh', methods=['POST']) @admin_required() def refresh_token(username): access_token, _ = generate_tokens(username) return jsonify({'access_token': access_token}) @register_routes.route("/createUser", methods=["POST"]) @admin_required() def register(username): username = request.json["username"] username = request.json["username"] password = request.json["password"] password = request.json["password"] description = request.json["description"] description = request.json["description"] Loading @@ -46,22 +94,16 @@ def register(): @register_routes.route("/getauth", methods=["GET"]) @register_routes.route("/getauth", methods=["GET"]) @auth.login_required @auth.login_required def getauth(): def getauth(): username, role = auth.current_user() username, _ = auth.current_user() return register_operation.get_auth(username) return register_operation.get_auth(username) @register_routes.route("/deleteUser", methods=["DELETE"]) @register_routes.route("/deleteUser/<uuid>", methods=["DELETE"]) @auth.login_required @admin_required() @admin_required def remove(username, uuid): def remove(): return register_operation.remove_user(uuid) username = request.json["username"] password = request.json["password"] return register_operation.remove_user(username, password) @register_routes.route("/getUsers", methods=["GET"]) @register_routes.route("/getUsers", methods=["GET"]) @auth.login_required @admin_required() @admin_required def getUsers(username): def getUsers(): return register_operation.get_users() return register_operation.get_users() services/register/register_service/core/register_operations.py +5 −4 Original line number Original line Diff line number Diff line Loading @@ -26,7 +26,7 @@ class RegisterOperations: user_info = dict(uuid=user_uuid, username=username, password=password, description=description, email=email, onboarding_date=datetime.now()) user_info = dict(uuid=user_uuid, username=username, password=password, description=description, email=email, onboarding_date=datetime.now()) obj = mycol.insert_one(user_info) obj = mycol.insert_one(user_info) return jsonify(message="invoker registered successfully", uuid=user_uuid), 201 return jsonify(message="User registered successfully", uuid=user_uuid), 201 def get_auth(self, username): def get_auth(self, username): Loading @@ -51,16 +51,17 @@ class RegisterOperations: ccf_api_onboarding_url="api-provider-management/v1/registrations", ccf_api_onboarding_url="api-provider-management/v1/registrations", ccf_publish_url="published-apis/v1/<apfId>/service-apis", ccf_publish_url="published-apis/v1/<apfId>/service-apis", ccf_onboarding_url="api-invoker-management/v1/onboardedInvokers", ccf_onboarding_url="api-invoker-management/v1/onboardedInvokers", ccf_discover_url="service-apis/v1/allServiceAPIs?api-invoker-id="), 200 ccf_discover_url="service-apis/v1/allServiceAPIs?api-invoker-id=", ccf_security_url="capif-security/v1/trustedInvokers/<apiInvokerId>"), 200 except Exception as e: except Exception as e: return jsonify(message=f"Errors when try getting auth: {e}"), 500 return jsonify(message=f"Errors when try getting auth: {e}"), 500 def remove_user(self, username, password): def remove_user(self, uuid): mycol = self.db.get_col_by_name(self.db.capif_users) mycol = self.db.get_col_by_name(self.db.capif_users) try: try: mycol.delete_one({"username": username, "password": password}) mycol.delete_one({"uuid": uuid}) # Request to the helper to delete invokers and providers # Request to the helper to delete invokers and providers Loading Loading
services/register/config.yaml +2 −0 Original line number Original line Diff line number Diff line Loading @@ -14,5 +14,7 @@ ca_factory: { register: { register: { register_uuid: '6ba7b810-9dad-11d1-80b4-00c04fd430c8', register_uuid: '6ba7b810-9dad-11d1-80b4-00c04fd430c8', refresh_expiration: 30, #days token_expiration: 10, #mins admin_users: {admin: "password123"} admin_users: {admin: "password123"} } } No newline at end of file
services/register/register_service/__main__.py +5 −2 Original line number Original line Diff line number Diff line Loading @@ -6,11 +6,12 @@ from flask_jwt_extended import JWTManager from OpenSSL.crypto import PKey, TYPE_RSA, X509Req, dump_certificate_request, FILETYPE_PEM, dump_privatekey from OpenSSL.crypto import PKey, TYPE_RSA, X509Req, dump_certificate_request, FILETYPE_PEM, dump_privatekey import requests import requests import json import json import jwt from .config import Config from .config import Config app = Flask(__name__) app = Flask(__name__) jwt = JWTManager(app) jwt_manager = JWTManager(app) config = Config().get_config() config = Config().get_config() Loading Loading @@ -65,15 +66,17 @@ cert_file = open("register_service/certs/ca_root.crt", 'wb') cert_file.write(bytes(ca_root, 'utf-8')) cert_file.write(bytes(ca_root, 'utf-8')) cert_file.close() cert_file.close() # Request CAPIF private key to encode the token # Request CAPIF private key to encode the CAPIF token url = 'http://{}:{}/v1/secret/data/server_cert/private'.format(config["ca_factory"]["url"], config["ca_factory"]["port"]) url = 'http://{}:{}/v1/secret/data/server_cert/private'.format(config["ca_factory"]["url"], config["ca_factory"]["port"]) headers = {'X-Vault-Token': f"{config["ca_factory"]["token"]}"} headers = {'X-Vault-Token': f"{config["ca_factory"]["token"]}"} response = requests.request("GET", url, headers=headers, verify = False) response = requests.request("GET", url, headers=headers, verify = False) key_data = json.loads(response.text)["data"]["data"]["key"] key_data = json.loads(response.text)["data"]["data"]["key"] app.config['JWT_ALGORITHM'] = 'RS256' app.config['JWT_ALGORITHM'] = 'RS256' app.config['JWT_PRIVATE_KEY'] = key_data app.config['JWT_PRIVATE_KEY'] = key_data app.config['REGISTRE_SECRET_KEY'] = config["register"]["register_uuid"] app.register_blueprint(register_routes) app.register_blueprint(register_routes) Loading
services/register/register_service/controllers/register_controller.py +70 −28 Original line number Original line Diff line number Diff line Loading @@ -3,13 +3,34 @@ from flask import current_app, Flask, jsonify, request, Blueprint from flask import current_app, Flask, jsonify, request, Blueprint from ..core.register_operations import RegisterOperations from ..core.register_operations import RegisterOperations from ..config import Config from ..config import Config from functools import wraps from datetime import datetime, timedelta from flask_httpauth import HTTPBasicAuth from flask_httpauth import HTTPBasicAuth import jwt auth = HTTPBasicAuth() auth = HTTPBasicAuth() config = Config().get_config() config = Config().get_config() register_routes = Blueprint("register_routes", __name__) register_operation = RegisterOperations() # Function to generate access tokens and refresh tokens def generate_tokens(username): access_payload = { 'username': username, 'exp': datetime.now() + timedelta(minutes=config["register"]["token_expiration"]) } refresh_payload = { 'username': username, 'exp': datetime.now() + timedelta(days=config["register"]["refresh_expiration"]) } access_token = jwt.encode(access_payload, current_app.config['REGISTRE_SECRET_KEY'], algorithm='HS256') refresh_token = jwt.encode(refresh_payload, current_app.config['REGISTRE_SECRET_KEY'], algorithm='HS256') return access_token, refresh_token # Function in charge of verifying the basic auth @auth.verify_password @auth.verify_password def verify_password(username, password): def verify_password(username, password): users = register_operation.get_users()[0].json["users"] users = register_operation.get_users()[0].json["users"] Loading @@ -19,23 +40,50 @@ def verify_password(username, password): if user["username"] == username and user["password"]==password: if user["username"] == username and user["password"]==password: return username, "client" return username, "client" def admin_required(fn): # Function responsible for verifying the token def wrapper(*args, **kwargs): def admin_required(): username, role = auth.current_user() def decorator(f): if role == 'admin': @wraps(f) return fn(*args, **kwargs) def decorated(*args, **kwargs): else: return jsonify(message="Access denied. Administrator privileges required."), 403 wrapper.__name__ = fn.__name__ return wrapper register_routes = Blueprint("register_routes", __name__) token = request.headers.get('Authorization') register_operation = RegisterOperations() if not token: return jsonify({'message': 'Token is missing'}), 401 @register_routes.route("/createUser", methods=["POST"]) if token.startswith('Bearer '): token = token.split('Bearer ')[1] if not token: return jsonify({'message': 'Token is missing'}), 401 try: data = jwt.decode(token, current_app.config['REGISTRE_SECRET_KEY'], algorithms=['HS256'], options={'verify_exp': True}) username = data['username'] return f(username, *args, **kwargs) except Exception as e: return jsonify({'message': str(e)}), 401 return decorated return decorator @register_routes.route('/login', methods=['POST']) @auth.login_required @auth.login_required @admin_required def login(): def register(): username, rol = auth.current_user() if rol != "admin": return jsonify(message="Unauthorized. Administrator privileges required."), 401 access_token, refresh_token = generate_tokens(username) return jsonify({'access_token': access_token, 'refresh_token': refresh_token}) @register_routes.route('/refresh', methods=['POST']) @admin_required() def refresh_token(username): access_token, _ = generate_tokens(username) return jsonify({'access_token': access_token}) @register_routes.route("/createUser", methods=["POST"]) @admin_required() def register(username): username = request.json["username"] username = request.json["username"] password = request.json["password"] password = request.json["password"] description = request.json["description"] description = request.json["description"] Loading @@ -46,22 +94,16 @@ def register(): @register_routes.route("/getauth", methods=["GET"]) @register_routes.route("/getauth", methods=["GET"]) @auth.login_required @auth.login_required def getauth(): def getauth(): username, role = auth.current_user() username, _ = auth.current_user() return register_operation.get_auth(username) return register_operation.get_auth(username) @register_routes.route("/deleteUser", methods=["DELETE"]) @register_routes.route("/deleteUser/<uuid>", methods=["DELETE"]) @auth.login_required @admin_required() @admin_required def remove(username, uuid): def remove(): return register_operation.remove_user(uuid) username = request.json["username"] password = request.json["password"] return register_operation.remove_user(username, password) @register_routes.route("/getUsers", methods=["GET"]) @register_routes.route("/getUsers", methods=["GET"]) @auth.login_required @admin_required() @admin_required def getUsers(username): def getUsers(): return register_operation.get_users() return register_operation.get_users()
services/register/register_service/core/register_operations.py +5 −4 Original line number Original line Diff line number Diff line Loading @@ -26,7 +26,7 @@ class RegisterOperations: user_info = dict(uuid=user_uuid, username=username, password=password, description=description, email=email, onboarding_date=datetime.now()) user_info = dict(uuid=user_uuid, username=username, password=password, description=description, email=email, onboarding_date=datetime.now()) obj = mycol.insert_one(user_info) obj = mycol.insert_one(user_info) return jsonify(message="invoker registered successfully", uuid=user_uuid), 201 return jsonify(message="User registered successfully", uuid=user_uuid), 201 def get_auth(self, username): def get_auth(self, username): Loading @@ -51,16 +51,17 @@ class RegisterOperations: ccf_api_onboarding_url="api-provider-management/v1/registrations", ccf_api_onboarding_url="api-provider-management/v1/registrations", ccf_publish_url="published-apis/v1/<apfId>/service-apis", ccf_publish_url="published-apis/v1/<apfId>/service-apis", ccf_onboarding_url="api-invoker-management/v1/onboardedInvokers", ccf_onboarding_url="api-invoker-management/v1/onboardedInvokers", ccf_discover_url="service-apis/v1/allServiceAPIs?api-invoker-id="), 200 ccf_discover_url="service-apis/v1/allServiceAPIs?api-invoker-id=", ccf_security_url="capif-security/v1/trustedInvokers/<apiInvokerId>"), 200 except Exception as e: except Exception as e: return jsonify(message=f"Errors when try getting auth: {e}"), 500 return jsonify(message=f"Errors when try getting auth: {e}"), 500 def remove_user(self, username, password): def remove_user(self, uuid): mycol = self.db.get_col_by_name(self.db.capif_users) mycol = self.db.get_col_by_name(self.db.capif_users) try: try: mycol.delete_one({"username": username, "password": password}) mycol.delete_one({"uuid": uuid}) # Request to the helper to delete invokers and providers # Request to the helper to delete invokers and providers Loading