Newer
Older
from controllers.register_controller import register_routes
from OpenSSL.crypto import PKey, TYPE_RSA, X509Req, dump_certificate_request, FILETYPE_PEM, dump_privatekey
import requests
import json
from config import Config
Stavros-Anastasios Charismiadis
committed
from db.db import MongoDatabse
# Create a superadmin CSR and keys
key = PKey()
key.generate_key(TYPE_RSA, 2048)
req = X509Req()
req.get_subject().O = 'Telefonica I+D'
req.get_subject().OU = 'Innovation'
req.get_subject().L = 'Madrid'
req.get_subject().ST = 'Madrid'
req.get_subject().C = 'ES'
req.get_subject().emailAddress = 'inno@tid.es'
req.set_pubkey(key)
req.sign(key, 'sha256')
csr_request = dump_certificate_request(FILETYPE_PEM, req)
private_key = dump_privatekey(FILETYPE_PEM, key)
# Save superadmin private key
key_file = open("certs/superadmin.key", 'wb+')
key_file.write(bytes(private_key))
key_file.close()
# Request superadmin certificate
url = 'http://{}:{}/v1/pki_int/sign/my-ca'.format(config["ca_factory"]["url"], config["ca_factory"]["port"])
headers = {'X-Vault-Token': f"{config["ca_factory"]["token"]}"}
data = {
'format':'pem_bundle',
'ttl': '43000h',
'csr': csr_request,
'common_name': "superadmin"
}
response = requests.request("POST", url, headers=headers, data=data, verify = config["ca_factory"].get("verify", False))
superadmin_cert = json.loads(response.text)['data']['certificate']
# Save the superadmin certificate
cert_file = open("certs/superadmin.crt", 'wb')
cert_file.write(bytes(superadmin_cert, 'utf-8'))
cert_file.close()
url = f"http://{config['ca_factory']['url']}:{config['ca_factory']['port']}/v1/secret/data/ca"
headers = {
'X-Vault-Token': config['ca_factory']['token']
}
response = requests.request("GET", url, headers=headers, verify = config["ca_factory"].get("verify", False))
ca_root = json.loads(response.text)['data']['data']['ca']
cert_file = open("certs/ca_root.crt", 'wb')
cert_file.write(bytes(ca_root, 'utf-8'))
cert_file.close()
# Request CAPIF private key to encode the CAPIF token
url = 'http://{}:{}/v1/secret/data/server_cert/private'.format(config["ca_factory"]["url"], config["ca_factory"]["port"])
headers = {'X-Vault-Token': f"{config["ca_factory"]["token"]}"}
response = requests.request("GET", url, headers=headers, verify = config["ca_factory"].get("verify", False))
key_data = json.loads(response.text)["data"]["data"]["key"]
# Create an Admin in the Admin Collection
client = MongoDatabse()
if not client.get_col_by_name(client.capif_admins).find_one({"admin_name": config["register"]["admin_users"]["admin_user"], "admin_pass": config["register"]["admin_users"]["admin_pass"]}):
print(f'Inserting Initial Admin admin_name: {config["register"]["admin_users"]["admin_user"]}, admin_pass: {config["register"]["admin_users"]["admin_pass"]}')
client.get_col_by_name(client.capif_admins).insert_one({"admin_name": config["register"]["admin_users"]["admin_user"], "admin_pass": config["register"]["admin_users"]["admin_pass"]})
app.config['JWT_ALGORITHM'] = 'RS256'
app.config['JWT_PRIVATE_KEY'] = key_data
app.config['REGISTRE_SECRET_KEY'] = config["register"]["register_uuid"]
app.register_blueprint(register_routes)