Skip to content

Instantly share code, notes, and snippets.

@REASY
Created March 18, 2026 04:43
Show Gist options
  • Select an option

  • Save REASY/c2759c67288a6c449cf3362e8b35e44e to your computer and use it in GitHub Desktop.

Select an option

Save REASY/c2759c67288a6c449cf3362e8b35e44e to your computer and use it in GitHub Desktop.
socks5_endpoint_client.py
#!/usr/bin/env python3
"""Send Hello to tcp/udp/http endpoints through a SOCKS5 proxy.
Examples:
python3 tools/socks5_endpoint_client.py \
--proxy 127.0.0.1:1080 \
tcp:example.com:9000
python3 tools/socks5_endpoint_client.py \
--proxy proxy.example.com:1080 \
--username user \
--password secret \
--verbose \
udp:1.2.3.4:9001
"""
from __future__ import annotations
import argparse
import http.client
import ipaddress
import json
import socket
import struct
import sys
from dataclasses import dataclass
HELLO_BYTES = b"Hello"
CONNECT_TIMEOUT_S = 15.0
READ_TIMEOUT_S = 15.0
TCP_IDLE_TIMEOUT_S = 1.0
SOCKS_VERSION = 0x05
SOCKS_CMD_CONNECT = 0x01
SOCKS_CMD_UDP_ASSOCIATE = 0x03
SOCKS_AUTH_NO_AUTH = 0x00
SOCKS_AUTH_USERNAME_PASSWORD = 0x02
SOCKS_ATYP_IPV4 = 0x01
SOCKS_ATYP_DOMAIN = 0x03
SOCKS_ATYP_IPV6 = 0x04
UDP_MAX_DATAGRAM = 65_507
SOCKS_REPLY_MESSAGES = {
0x00: "succeeded",
0x01: "general SOCKS server failure",
0x02: "connection not allowed by ruleset",
0x03: "network unreachable",
0x04: "host unreachable",
0x05: "connection refused",
0x06: "TTL expired",
0x07: "command not supported",
0x08: "address type not supported",
}
@dataclass(frozen=True)
class HostPort:
host: str
port: int
def display(self) -> str:
if ":" in self.host and not self.host.startswith("["):
return f"[{self.host}]:{self.port}"
return f"{self.host}:{self.port}"
@dataclass(frozen=True)
class Endpoint:
protocol: str
host: str
port: int
def display(self) -> str:
if ":" in self.host and not self.host.startswith("["):
return f"{self.protocol}:[{self.host}]:{self.port}"
return f"{self.protocol}:{self.host}:{self.port}"
def http_host_header(self) -> str:
if ":" in self.host and not self.host.startswith("["):
return f"[{self.host}]:{self.port}"
return f"{self.host}:{self.port}"
class Socks5Error(RuntimeError):
pass
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Send 'Hello' through a SOCKS5 proxy to a tcp/udp/http endpoint and "
"pretty-print the JSON response."
)
)
parser.add_argument(
"endpoint",
help="Endpoint in protocol:host:port form, e.g. tcp:example.com:9000",
)
parser.add_argument(
"--proxy",
required=True,
help="SOCKS5 proxy in host:port form, e.g. 127.0.0.1:1080",
)
parser.add_argument("--username", help="Optional SOCKS5 username")
parser.add_argument("--password", default="", help="Optional SOCKS5 password")
parser.add_argument(
"--message",
default=HELLO_BYTES.decode("utf-8"),
help="Message to send as UTF-8 bytes. Default: Hello",
)
parser.add_argument(
"--timeout",
type=float,
default=CONNECT_TIMEOUT_S,
help=f"Socket timeout in seconds. Default: {CONNECT_TIMEOUT_S:g}",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Print SOCKS5 handshake/request bytes to stderr",
)
parser.add_argument(
"--tcp-half-close",
action="store_true",
help=(
"After sending the TCP payload, call shutdown(SHUT_WR). Disabled by "
"default because some SOCKS5 proxies do not forward TCP half-close "
"semantics correctly."
),
)
args = parser.parse_args()
if args.password and args.username is None:
parser.error("--password requires --username")
if not args.message:
parser.error("--message must not be empty")
return args
def main() -> int:
args = parse_args()
endpoint = parse_endpoint(args.endpoint)
proxy = parse_host_port(args.proxy, "--proxy")
try:
result = request_json_via_socks5(
proxy=proxy,
endpoint=endpoint,
message=args.message.encode("utf-8"),
timeout=args.timeout,
username=args.username,
password=args.password,
verbose=args.verbose,
tcp_half_close=args.tcp_half_close,
)
except Exception as exc: # noqa: BLE001 - small CLI tool
print(f"error: {exc}", file=sys.stderr)
return 1
print(result)
return 0
def request_json_via_socks5(
*,
proxy: HostPort,
endpoint: Endpoint,
message: bytes,
timeout: float,
username: str | None,
password: str,
verbose: bool,
tcp_half_close: bool,
) -> str:
if endpoint.protocol == "tcp":
return request_tcp_via_socks5(
proxy,
endpoint,
message,
timeout,
username,
password,
verbose,
tcp_half_close,
)
if endpoint.protocol == "udp":
return request_udp_via_socks5(proxy, endpoint, message, timeout, username, password, verbose)
if endpoint.protocol == "http":
return request_http_via_socks5(proxy, endpoint, message, timeout, username, password, verbose)
raise ValueError(f"Unsupported protocol: {endpoint.protocol}")
def request_tcp_via_socks5(
proxy: HostPort,
endpoint: Endpoint,
message: bytes,
timeout: float,
username: str | None,
password: str,
verbose: bool,
tcp_half_close: bool,
) -> str:
with socks5_connect(proxy, endpoint.host, endpoint.port, timeout, username, password, verbose) as upstream:
upstream.sendall(message)
log_bytes(verbose, f"endpoint > tcp payload to {endpoint.display()}", message)
if tcp_half_close:
if verbose:
print("TCP > shutdown(SHUT_WR)", file=sys.stderr)
upstream.shutdown(socket.SHUT_WR)
chunks: list[bytes] = []
received_any = False
while True:
try:
chunk = upstream.recv(4096)
if not chunk:
break
chunks.append(chunk)
received_any = True
upstream.settimeout(TCP_IDLE_TIMEOUT_S)
except socket.timeout as exc:
if received_any:
break
raise RuntimeError("Timed out waiting for the first TCP response byte.") from exc
if not received_any:
raise RuntimeError("No TCP response received.")
response_bytes = b"".join(chunks)
log_bytes(verbose, f"endpoint < tcp payload from {endpoint.display()}", response_bytes)
return format_json_bytes(response_bytes)
def request_udp_via_socks5(
proxy: HostPort,
endpoint: Endpoint,
message: bytes,
timeout: float,
username: str | None,
password: str,
verbose: bool,
) -> str:
control_sock = open_socks5_control_connection(proxy, timeout, username, password, verbose)
try:
wildcard_host = "::" if control_sock.family == socket.AF_INET6 else "0.0.0.0"
relay = socks5_command(
control_sock,
SOCKS_CMD_UDP_ASSOCIATE,
wildcard_host,
0,
"UDP ASSOCIATE",
verbose,
)
relay = normalize_udp_relay(relay, control_sock)
udp_sock, relay_sockaddr = open_udp_socket(relay, timeout)
try:
packet = build_socks5_udp_packet(endpoint.host, endpoint.port, message)
log_bytes(verbose, f"SOCKS5/UDP > relay {relay.display()}", packet)
udp_sock.sendto(packet, relay_sockaddr)
response_packet, peer = udp_sock.recvfrom(UDP_MAX_DATAGRAM)
log_bytes(verbose, f"SOCKS5/UDP < relay {peer[0]}:{peer[1]}", response_packet)
_, _, payload = parse_socks5_udp_packet(response_packet)
log_bytes(verbose, f"endpoint < udp payload from {endpoint.display()}", payload)
return format_json_bytes(payload)
finally:
udp_sock.close()
finally:
control_sock.close()
def request_http_via_socks5(
proxy: HostPort,
endpoint: Endpoint,
message: bytes,
timeout: float,
username: str | None,
password: str,
verbose: bool,
) -> str:
with socks5_connect(proxy, endpoint.host, endpoint.port, timeout, username, password, verbose) as upstream:
request_bytes = (
f"POST / HTTP/1.1\r\n"
f"Host: {endpoint.http_host_header()}\r\n"
f"Accept: application/json\r\n"
f"Content-Type: text/plain; charset=utf-8\r\n"
f"Content-Length: {len(message)}\r\n"
f"Connection: close\r\n"
f"\r\n"
).encode("ascii") + message
log_bytes(verbose, f"endpoint > http request to {endpoint.display()}", request_bytes)
upstream.sendall(request_bytes)
response = http.client.HTTPResponse(upstream)
response.begin()
body = response.read()
if verbose:
print(
f"HTTP < {response.status} {response.reason} from {endpoint.display()}",
file=sys.stderr,
)
if not body:
raise RuntimeError(f"HTTP {response.status} returned an empty response body.")
log_bytes(verbose, f"endpoint < http body from {endpoint.display()}", body)
return format_json_bytes(body)
def open_socks5_control_connection(
proxy: HostPort,
timeout: float,
username: str | None,
password: str,
verbose: bool,
) -> socket.socket:
sock = socket.create_connection((proxy.host, proxy.port), timeout=timeout)
sock.settimeout(timeout)
socks5_negotiate_auth(sock, username, password, verbose)
return sock
def socks5_connect(
proxy: HostPort,
destination_host: str,
destination_port: int,
timeout: float,
username: str | None,
password: str,
verbose: bool,
) -> socket.socket:
sock = open_socks5_control_connection(proxy, timeout, username, password, verbose)
try:
bound = socks5_command(
sock,
SOCKS_CMD_CONNECT,
destination_host,
destination_port,
"CONNECT",
verbose,
)
if verbose:
print(f"SOCKS5 CONNECT established via {bound.display()}", file=sys.stderr)
return sock
except Exception:
sock.close()
raise
def socks5_negotiate_auth(
sock: socket.socket,
username: str | None,
password: str,
verbose: bool,
) -> None:
methods = [SOCKS_AUTH_NO_AUTH]
if username is not None:
methods.append(SOCKS_AUTH_USERNAME_PASSWORD)
greeting = bytes([SOCKS_VERSION, len(methods), *methods])
send_frame(sock, greeting, "SOCKS5 > greeting", verbose)
response = recv_exact(sock, 2)
log_bytes(verbose, "SOCKS5 < method selection", response)
if response[0] != SOCKS_VERSION:
raise Socks5Error(f"Unexpected SOCKS version in greeting reply: {response[0]!r}")
if response[1] == 0xFF:
raise Socks5Error("SOCKS5 proxy rejected all advertised authentication methods.")
if response[1] == SOCKS_AUTH_USERNAME_PASSWORD:
if username is None:
raise Socks5Error("SOCKS5 proxy requested username/password auth, but no username was provided.")
negotiate_username_password(sock, username, password, verbose)
elif response[1] != SOCKS_AUTH_NO_AUTH:
raise Socks5Error(f"SOCKS5 proxy selected unsupported auth method 0x{response[1]:02x}.")
def negotiate_username_password(
sock: socket.socket,
username: str,
password: str,
verbose: bool,
) -> None:
username_bytes = username.encode("utf-8")
password_bytes = password.encode("utf-8")
if len(username_bytes) > 255 or len(password_bytes) > 255:
raise ValueError("SOCKS5 username and password must each fit in 255 bytes.")
request = (
b"\x01"
+ bytes([len(username_bytes)])
+ username_bytes
+ bytes([len(password_bytes)])
+ password_bytes
)
send_frame(sock, request, "SOCKS5 > username/password auth", verbose)
response = recv_exact(sock, 2)
log_bytes(verbose, "SOCKS5 < username/password auth", response)
if response[1] != 0x00:
raise Socks5Error(f"SOCKS5 username/password auth failed with status 0x{response[1]:02x}.")
def socks5_command(
sock: socket.socket,
command: int,
destination_host: str,
destination_port: int,
label: str,
verbose: bool,
) -> HostPort:
allow_zero_port = command == SOCKS_CMD_UDP_ASSOCIATE and destination_port == 0
request = bytes([SOCKS_VERSION, command, 0x00]) + encode_socks5_address(
destination_host,
destination_port,
allow_zero_port=allow_zero_port,
)
send_frame(sock, request, f"SOCKS5 > {label}", verbose)
response = read_socks5_reply(sock)
log_bytes(verbose, f"SOCKS5 < {label}", response)
reply_code = response[1]
if reply_code != 0x00:
message = SOCKS_REPLY_MESSAGES.get(reply_code, "unknown error")
raise Socks5Error(f"SOCKS5 {label} failed with 0x{reply_code:02x}: {message}.")
host, port, _ = decode_socks5_address(response, 3)
return HostPort(host, port)
def read_socks5_reply(sock: socket.socket) -> bytes:
header = recv_exact(sock, 4)
version, _reply, _reserved, atyp = header
if version != SOCKS_VERSION:
raise Socks5Error(f"Unexpected SOCKS version in command reply: {version!r}")
if atyp == SOCKS_ATYP_IPV4:
remainder = recv_exact(sock, 6)
elif atyp == SOCKS_ATYP_IPV6:
remainder = recv_exact(sock, 18)
elif atyp == SOCKS_ATYP_DOMAIN:
domain_length = recv_exact(sock, 1)
remainder = domain_length + recv_exact(sock, domain_length[0] + 2)
else:
raise Socks5Error(f"Unsupported SOCKS5 address type in reply: 0x{atyp:02x}.")
return header + remainder
def open_udp_socket(relay: HostPort, timeout: float) -> tuple[socket.socket, tuple[object, ...]]:
infos = socket.getaddrinfo(relay.host, relay.port, 0, socket.SOCK_DGRAM)
family, socktype, proto, _canonname, sockaddr = infos[0]
udp_sock = socket.socket(family, socktype, proto)
udp_sock.settimeout(timeout)
return udp_sock, sockaddr
def normalize_udp_relay(relay: HostPort, control_sock: socket.socket) -> HostPort:
if relay.host in {"0.0.0.0", "::"}:
peer = control_sock.getpeername()
return HostPort(peer[0], relay.port)
return relay
def build_socks5_udp_packet(destination_host: str, destination_port: int, payload: bytes) -> bytes:
return b"\x00\x00\x00" + encode_socks5_address(destination_host, destination_port) + payload
def parse_socks5_udp_packet(packet: bytes) -> tuple[str, int, bytes]:
if len(packet) < 4:
raise Socks5Error("SOCKS5 UDP reply was too short.")
if packet[:2] != b"\x00\x00":
raise Socks5Error("SOCKS5 UDP reply had a non-zero reserved field.")
if packet[2] != 0x00:
raise Socks5Error("SOCKS5 UDP fragmentation is not supported.")
host, port, offset = decode_socks5_address(packet, 3)
return host, port, packet[offset:]
def encode_socks5_address(host: str, port: int, *, allow_zero_port: bool = False) -> bytes:
validate_port(port, allow_zero_port=allow_zero_port)
try:
ip = ipaddress.ip_address(host)
except ValueError:
host_bytes = host.encode("idna")
if len(host_bytes) > 255:
raise ValueError("SOCKS5 domain names must fit in 255 bytes.") from None
return bytes([SOCKS_ATYP_DOMAIN, len(host_bytes)]) + host_bytes + struct.pack("!H", port)
if isinstance(ip, ipaddress.IPv4Address):
return bytes([SOCKS_ATYP_IPV4]) + ip.packed + struct.pack("!H", port)
return bytes([SOCKS_ATYP_IPV6]) + ip.packed + struct.pack("!H", port)
def decode_socks5_address(buffer: bytes, offset: int) -> tuple[str, int, int]:
atyp = buffer[offset]
cursor = offset + 1
if atyp == SOCKS_ATYP_IPV4:
end = cursor + 4
host = str(ipaddress.IPv4Address(buffer[cursor:end]))
cursor = end
elif atyp == SOCKS_ATYP_IPV6:
end = cursor + 16
host = str(ipaddress.IPv6Address(buffer[cursor:end]))
cursor = end
elif atyp == SOCKS_ATYP_DOMAIN:
length = buffer[cursor]
cursor += 1
end = cursor + length
host = buffer[cursor:end].decode("idna")
cursor = end
else:
raise Socks5Error(f"Unsupported SOCKS5 address type 0x{atyp:02x}.")
port = struct.unpack("!H", buffer[cursor:cursor + 2])[0]
return host, port, cursor + 2
def recv_exact(sock: socket.socket, count: int) -> bytes:
data = bytearray()
while len(data) < count:
chunk = sock.recv(count - len(data))
if not chunk:
raise Socks5Error("Unexpected EOF while reading from SOCKS5 proxy.")
data.extend(chunk)
return bytes(data)
def format_json_bytes(data: bytes) -> str:
try:
text = data.decode("utf-8")
except UnicodeDecodeError as exc:
raise RuntimeError("Response was not valid UTF-8.") from exc
trimmed = text.strip()
if not trimmed:
raise RuntimeError("Response body was empty.")
try:
parsed = json.loads(trimmed)
except json.JSONDecodeError as exc:
raise RuntimeError("Response was not valid JSON.") from exc
return json.dumps(parsed, indent=2, ensure_ascii=False)
def parse_endpoint(spec: str) -> Endpoint:
trimmed = spec.strip()
first_separator = trimmed.find(":")
last_separator = trimmed.rfind(":")
if first_separator <= 0 or last_separator <= first_separator or last_separator >= len(trimmed) - 1:
raise ValueError("Endpoint must use protocol:host:port format.")
protocol = trimmed[:first_separator].lower()
if protocol not in {"tcp", "udp", "http"}:
raise ValueError("Endpoint protocol must be tcp, udp, or http.")
host = trimmed[first_separator + 1:last_separator].strip()
port_text = trimmed[last_separator + 1:].strip()
host = normalize_host_literal(host, "Endpoint host")
port = parse_port(port_text, "Endpoint port")
return Endpoint(protocol, host, port)
def parse_host_port(spec: str, label: str) -> HostPort:
trimmed = spec.strip()
if not trimmed:
raise ValueError(f"{label} must not be empty.")
if trimmed.startswith("["):
end = trimmed.find("]")
if end == -1 or end + 1 >= len(trimmed) or trimmed[end + 1] != ":":
raise ValueError(f"{label} must use host:port form.")
host = trimmed[1:end]
port_text = trimmed[end + 2:]
else:
separator = trimmed.rfind(":")
if separator <= 0 or separator >= len(trimmed) - 1:
raise ValueError(f"{label} must use host:port form.")
host = trimmed[:separator]
port_text = trimmed[separator + 1:]
host = normalize_host_literal(host, label)
port = parse_port(port_text, f"{label} port")
return HostPort(host, port)
def normalize_host_literal(host: str, label: str) -> str:
normalized = host.strip()
if normalized.startswith("[") and normalized.endswith("]"):
normalized = normalized[1:-1]
if not normalized:
raise ValueError(f"{label} must not be blank.")
return normalized
def parse_port(port_text: str, label: str) -> int:
try:
port = int(port_text, 10)
except ValueError as exc:
raise ValueError(f"{label} must be a decimal port number.") from exc
validate_port(port)
return port
def validate_port(port: int, *, allow_zero_port: bool = False) -> None:
if allow_zero_port and port == 0:
return
if not 1 <= port <= 65_535:
raise ValueError("Port must be between 1 and 65535.")
def send_frame(sock: socket.socket, data: bytes, label: str, verbose: bool) -> None:
log_bytes(verbose, label, data)
sock.sendall(data)
def log_bytes(enabled: bool, label: str, data: bytes) -> None:
if not enabled:
return
hex_bytes = " ".join(f"{byte:02x}" for byte in data)
print(f"{label}: {hex_bytes}", file=sys.stderr)
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment