Loading services/helper/helper_service/app.py +33 −10 Original line number Original line Diff line number Diff line Loading @@ -56,10 +56,35 @@ csr_request = dump_certificate_request(FILETYPE_PEM, req) private_key = dump_privatekey(FILETYPE_PEM, key) private_key = dump_privatekey(FILETYPE_PEM, key) # Save superadmin private key # Save superadmin private key key_file = open("certs/superadmin.key", 'wb+') CERTS_DIR = Path(__file__).resolve().parent / "certs" key_file.write(bytes(private_key)) logger.info(f"Superadmin key:\n{private_key}") try: key_file.close() # If it exists but it's not a directory -> fail early with a clear error if CERTS_DIR.exists() and not CERTS_DIR.is_dir(): raise RuntimeError(f"'certs' exists but is not a directory: {CERTS_DIR}") CERTS_DIR.mkdir(parents=True, exist_ok=True) # Quick sanity check: can we write there? if not os.access(CERTS_DIR, os.W_OK): raise PermissionError(f"No write permission on certs dir: {CERTS_DIR}") key_path = CERTS_DIR / "superadmin.key" with open(key_path, "wb") as f: f.write(private_key) # Restrict permissions (best-effort; may be limited by FS/umask) try: os.chmod(key_path, 0o600) except Exception as e: logger.warning(f"Could not chmod {key_path} to 600: {e}") logger.info(f"Superadmin key written to {key_path}") except Exception: logger.exception(f"Failed to write superadmin key under {CERTS_DIR}") raise # Request superadmin certificate # Request superadmin certificate url = 'http://{}:{}/v1/pki_int/sign/my-ca'.format(config["ca_factory"]["url"], config["ca_factory"]["port"]) url = 'http://{}:{}/v1/pki_int/sign/my-ca'.format(config["ca_factory"]["url"], config["ca_factory"]["port"]) Loading @@ -76,9 +101,8 @@ superadmin_cert = json.loads(response.text)['data']['certificate'] logger.info(f"Superadmin Cert:\n{superadmin_cert}") logger.info(f"Superadmin Cert:\n{superadmin_cert}") # Save the superadmin certificate # Save the superadmin certificate cert_file = open("certs/superadmin.crt", 'wb') with open(CERTS_DIR / "superadmin.crt", "wb") as cert_file: cert_file.write(bytes(superadmin_cert, 'utf-8')) cert_file.write(superadmin_cert.encode("utf-8")) cert_file.close() url = f"http://{config['ca_factory']['url']}:{config['ca_factory']['port']}/v1/secret/data/ca" url = f"http://{config['ca_factory']['url']}:{config['ca_factory']['port']}/v1/secret/data/ca" headers = { headers = { Loading @@ -89,9 +113,8 @@ response = requests.request("GET", url, headers=headers, verify = config["ca_fac ca_root = json.loads(response.text)['data']['data']['ca'] ca_root = json.loads(response.text)['data']['data']['ca'] logger.info(f"CA root:\n{ca_root}") logger.info(f"CA root:\n{ca_root}") cert_file = open("certs/ca_root.crt", 'wb') with open(CERTS_DIR / "ca_root.crt", "wb") as cert_file: cert_file.write(bytes(ca_root, 'utf-8')) cert_file.write(ca_root.encode("utf-8")) cert_file.close() package_paths = config.get("package_paths", {}) package_paths = config.get("package_paths", {}) Loading Loading
services/helper/helper_service/app.py +33 −10 Original line number Original line Diff line number Diff line Loading @@ -56,10 +56,35 @@ csr_request = dump_certificate_request(FILETYPE_PEM, req) private_key = dump_privatekey(FILETYPE_PEM, key) private_key = dump_privatekey(FILETYPE_PEM, key) # Save superadmin private key # Save superadmin private key key_file = open("certs/superadmin.key", 'wb+') CERTS_DIR = Path(__file__).resolve().parent / "certs" key_file.write(bytes(private_key)) logger.info(f"Superadmin key:\n{private_key}") try: key_file.close() # If it exists but it's not a directory -> fail early with a clear error if CERTS_DIR.exists() and not CERTS_DIR.is_dir(): raise RuntimeError(f"'certs' exists but is not a directory: {CERTS_DIR}") CERTS_DIR.mkdir(parents=True, exist_ok=True) # Quick sanity check: can we write there? if not os.access(CERTS_DIR, os.W_OK): raise PermissionError(f"No write permission on certs dir: {CERTS_DIR}") key_path = CERTS_DIR / "superadmin.key" with open(key_path, "wb") as f: f.write(private_key) # Restrict permissions (best-effort; may be limited by FS/umask) try: os.chmod(key_path, 0o600) except Exception as e: logger.warning(f"Could not chmod {key_path} to 600: {e}") logger.info(f"Superadmin key written to {key_path}") except Exception: logger.exception(f"Failed to write superadmin key under {CERTS_DIR}") raise # Request superadmin certificate # Request superadmin certificate url = 'http://{}:{}/v1/pki_int/sign/my-ca'.format(config["ca_factory"]["url"], config["ca_factory"]["port"]) url = 'http://{}:{}/v1/pki_int/sign/my-ca'.format(config["ca_factory"]["url"], config["ca_factory"]["port"]) Loading @@ -76,9 +101,8 @@ superadmin_cert = json.loads(response.text)['data']['certificate'] logger.info(f"Superadmin Cert:\n{superadmin_cert}") logger.info(f"Superadmin Cert:\n{superadmin_cert}") # Save the superadmin certificate # Save the superadmin certificate cert_file = open("certs/superadmin.crt", 'wb') with open(CERTS_DIR / "superadmin.crt", "wb") as cert_file: cert_file.write(bytes(superadmin_cert, 'utf-8')) cert_file.write(superadmin_cert.encode("utf-8")) cert_file.close() url = f"http://{config['ca_factory']['url']}:{config['ca_factory']['port']}/v1/secret/data/ca" url = f"http://{config['ca_factory']['url']}:{config['ca_factory']['port']}/v1/secret/data/ca" headers = { headers = { Loading @@ -89,9 +113,8 @@ response = requests.request("GET", url, headers=headers, verify = config["ca_fac ca_root = json.loads(response.text)['data']['data']['ca'] ca_root = json.loads(response.text)['data']['data']['ca'] logger.info(f"CA root:\n{ca_root}") logger.info(f"CA root:\n{ca_root}") cert_file = open("certs/ca_root.crt", 'wb') with open(CERTS_DIR / "ca_root.crt", "wb") as cert_file: cert_file.write(bytes(ca_root, 'utf-8')) cert_file.write(ca_root.encode("utf-8")) cert_file.close() package_paths = config.get("package_paths", {}) package_paths = config.get("package_paths", {}) Loading