Skip to content
app.py 3.07 KiB
Newer Older
Jorge Moratinos's avatar
Jorge Moratinos committed

from flask import Flask
from controllers.register_controller import register_routes
Jorge Moratinos's avatar
Jorge Moratinos committed
from flask_jwt_extended import JWTManager
from OpenSSL.crypto import PKey, TYPE_RSA, X509Req, dump_certificate_request, FILETYPE_PEM, dump_privatekey
import requests
import json
Jorge Moratinos's avatar
Jorge Moratinos committed

app = Flask(__name__)

torrespel's avatar
torrespel committed

torrespel's avatar
torrespel committed
jwt_manager = JWTManager(app)
Jorge Moratinos's avatar
Jorge Moratinos committed

config = Config().get_config()
Jorge Moratinos's avatar
Jorge Moratinos committed

# Create a superadmin CSR and keys
key = PKey()
key.generate_key(TYPE_RSA, 2048)
req = X509Req()
req.get_subject().O = 'Telefonica I+D'
req.get_subject().OU = 'Innovation'
req.get_subject().L = 'Madrid'
req.get_subject().ST = 'Madrid'
req.get_subject().C = 'ES'
req.get_subject().emailAddress = 'inno@tid.es'
req.set_pubkey(key)
req.sign(key, 'sha256')

csr_request = dump_certificate_request(FILETYPE_PEM, req)
private_key = dump_privatekey(FILETYPE_PEM, key)

# Save superadmin private key
key_file = open("certs/superadmin.key", 'wb+')
key_file.write(bytes(private_key))
key_file.close()

# Request superadmin certificate
url = 'http://{}:{}/v1/pki_int/sign/my-ca'.format(config["ca_factory"]["url"], config["ca_factory"]["port"])  
headers = {'X-Vault-Token': f"{config["ca_factory"]["token"]}"}  
data = {
    'format':'pem_bundle',
    'ttl': '43000h',
    'csr': csr_request,
    'common_name': "superadmin"
}

Jorge Moratinos's avatar
Jorge Moratinos committed
response = requests.request("POST", url, headers=headers, data=data, verify = config["ca_factory"].get("verify", False))
superadmin_cert = json.loads(response.text)['data']['certificate']

# Save the superadmin certificate
cert_file = open("certs/superadmin.crt", 'wb')
cert_file.write(bytes(superadmin_cert, 'utf-8'))
cert_file.close()

url = f"http://{config['ca_factory']['url']}:{config['ca_factory']['port']}/v1/secret/data/ca"
headers = {

        'X-Vault-Token': config['ca_factory']['token']
}
Jorge Moratinos's avatar
Jorge Moratinos committed
response = requests.request("GET", url, headers=headers, verify = config["ca_factory"].get("verify", False))

ca_root = json.loads(response.text)['data']['data']['ca']
cert_file = open("certs/ca_root.crt", 'wb')
cert_file.write(bytes(ca_root, 'utf-8'))
cert_file.close()

torrespel's avatar
torrespel committed
# Request CAPIF private key to encode the CAPIF token
url = 'http://{}:{}/v1/secret/data/server_cert/private'.format(config["ca_factory"]["url"], config["ca_factory"]["port"])
headers = {'X-Vault-Token': f"{config["ca_factory"]["token"]}"}
Jorge Moratinos's avatar
Jorge Moratinos committed
response = requests.request("GET", url, headers=headers, verify = config["ca_factory"].get("verify", False))

key_data = json.loads(response.text)["data"]["data"]["key"]
Jorge Moratinos's avatar
Jorge Moratinos committed

torrespel's avatar
torrespel committed
# Create an Admin in the Admin Collection
client = MongoDatabse()
torrespel's avatar
torrespel committed
if not client.get_col_by_name(client.capif_admins).find_one({"admin_name": config["register"]["admin_users"]["admin_user"], "admin_pass": config["register"]["admin_users"]["admin_pass"]}):
    client.get_col_by_name(client.capif_admins).insert_one({"admin_name": config["register"]["admin_users"]["admin_user"], "admin_pass": config["register"]["admin_users"]["admin_pass"]})
torrespel's avatar
torrespel committed

Jorge Moratinos's avatar
Jorge Moratinos committed
app.config['JWT_ALGORITHM'] = 'RS256'
app.config['JWT_PRIVATE_KEY'] = key_data
torrespel's avatar
torrespel committed
app.config['REGISTRE_SECRET_KEY'] = config["register"]["register_uuid"]
Jorge Moratinos's avatar
Jorge Moratinos committed

app.register_blueprint(register_routes)