Created
March 18, 2026 04:43
-
-
Save REASY/c2759c67288a6c449cf3362e8b35e44e to your computer and use it in GitHub Desktop.
socks5_endpoint_client.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """Send Hello to tcp/udp/http endpoints through a SOCKS5 proxy. | |
| Examples: | |
| python3 tools/socks5_endpoint_client.py \ | |
| --proxy 127.0.0.1:1080 \ | |
| tcp:example.com:9000 | |
| python3 tools/socks5_endpoint_client.py \ | |
| --proxy proxy.example.com:1080 \ | |
| --username user \ | |
| --password secret \ | |
| --verbose \ | |
| udp:1.2.3.4:9001 | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import http.client | |
| import ipaddress | |
| import json | |
| import socket | |
| import struct | |
| import sys | |
| from dataclasses import dataclass | |
| HELLO_BYTES = b"Hello" | |
| CONNECT_TIMEOUT_S = 15.0 | |
| READ_TIMEOUT_S = 15.0 | |
| TCP_IDLE_TIMEOUT_S = 1.0 | |
| SOCKS_VERSION = 0x05 | |
| SOCKS_CMD_CONNECT = 0x01 | |
| SOCKS_CMD_UDP_ASSOCIATE = 0x03 | |
| SOCKS_AUTH_NO_AUTH = 0x00 | |
| SOCKS_AUTH_USERNAME_PASSWORD = 0x02 | |
| SOCKS_ATYP_IPV4 = 0x01 | |
| SOCKS_ATYP_DOMAIN = 0x03 | |
| SOCKS_ATYP_IPV6 = 0x04 | |
| UDP_MAX_DATAGRAM = 65_507 | |
| SOCKS_REPLY_MESSAGES = { | |
| 0x00: "succeeded", | |
| 0x01: "general SOCKS server failure", | |
| 0x02: "connection not allowed by ruleset", | |
| 0x03: "network unreachable", | |
| 0x04: "host unreachable", | |
| 0x05: "connection refused", | |
| 0x06: "TTL expired", | |
| 0x07: "command not supported", | |
| 0x08: "address type not supported", | |
| } | |
| @dataclass(frozen=True) | |
| class HostPort: | |
| host: str | |
| port: int | |
| def display(self) -> str: | |
| if ":" in self.host and not self.host.startswith("["): | |
| return f"[{self.host}]:{self.port}" | |
| return f"{self.host}:{self.port}" | |
| @dataclass(frozen=True) | |
| class Endpoint: | |
| protocol: str | |
| host: str | |
| port: int | |
| def display(self) -> str: | |
| if ":" in self.host and not self.host.startswith("["): | |
| return f"{self.protocol}:[{self.host}]:{self.port}" | |
| return f"{self.protocol}:{self.host}:{self.port}" | |
| def http_host_header(self) -> str: | |
| if ":" in self.host and not self.host.startswith("["): | |
| return f"[{self.host}]:{self.port}" | |
| return f"{self.host}:{self.port}" | |
| class Socks5Error(RuntimeError): | |
| pass | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser( | |
| description=( | |
| "Send 'Hello' through a SOCKS5 proxy to a tcp/udp/http endpoint and " | |
| "pretty-print the JSON response." | |
| ) | |
| ) | |
| parser.add_argument( | |
| "endpoint", | |
| help="Endpoint in protocol:host:port form, e.g. tcp:example.com:9000", | |
| ) | |
| parser.add_argument( | |
| "--proxy", | |
| required=True, | |
| help="SOCKS5 proxy in host:port form, e.g. 127.0.0.1:1080", | |
| ) | |
| parser.add_argument("--username", help="Optional SOCKS5 username") | |
| parser.add_argument("--password", default="", help="Optional SOCKS5 password") | |
| parser.add_argument( | |
| "--message", | |
| default=HELLO_BYTES.decode("utf-8"), | |
| help="Message to send as UTF-8 bytes. Default: Hello", | |
| ) | |
| parser.add_argument( | |
| "--timeout", | |
| type=float, | |
| default=CONNECT_TIMEOUT_S, | |
| help=f"Socket timeout in seconds. Default: {CONNECT_TIMEOUT_S:g}", | |
| ) | |
| parser.add_argument( | |
| "--verbose", | |
| action="store_true", | |
| help="Print SOCKS5 handshake/request bytes to stderr", | |
| ) | |
| parser.add_argument( | |
| "--tcp-half-close", | |
| action="store_true", | |
| help=( | |
| "After sending the TCP payload, call shutdown(SHUT_WR). Disabled by " | |
| "default because some SOCKS5 proxies do not forward TCP half-close " | |
| "semantics correctly." | |
| ), | |
| ) | |
| args = parser.parse_args() | |
| if args.password and args.username is None: | |
| parser.error("--password requires --username") | |
| if not args.message: | |
| parser.error("--message must not be empty") | |
| return args | |
| def main() -> int: | |
| args = parse_args() | |
| endpoint = parse_endpoint(args.endpoint) | |
| proxy = parse_host_port(args.proxy, "--proxy") | |
| try: | |
| result = request_json_via_socks5( | |
| proxy=proxy, | |
| endpoint=endpoint, | |
| message=args.message.encode("utf-8"), | |
| timeout=args.timeout, | |
| username=args.username, | |
| password=args.password, | |
| verbose=args.verbose, | |
| tcp_half_close=args.tcp_half_close, | |
| ) | |
| except Exception as exc: # noqa: BLE001 - small CLI tool | |
| print(f"error: {exc}", file=sys.stderr) | |
| return 1 | |
| print(result) | |
| return 0 | |
| def request_json_via_socks5( | |
| *, | |
| proxy: HostPort, | |
| endpoint: Endpoint, | |
| message: bytes, | |
| timeout: float, | |
| username: str | None, | |
| password: str, | |
| verbose: bool, | |
| tcp_half_close: bool, | |
| ) -> str: | |
| if endpoint.protocol == "tcp": | |
| return request_tcp_via_socks5( | |
| proxy, | |
| endpoint, | |
| message, | |
| timeout, | |
| username, | |
| password, | |
| verbose, | |
| tcp_half_close, | |
| ) | |
| if endpoint.protocol == "udp": | |
| return request_udp_via_socks5(proxy, endpoint, message, timeout, username, password, verbose) | |
| if endpoint.protocol == "http": | |
| return request_http_via_socks5(proxy, endpoint, message, timeout, username, password, verbose) | |
| raise ValueError(f"Unsupported protocol: {endpoint.protocol}") | |
| def request_tcp_via_socks5( | |
| proxy: HostPort, | |
| endpoint: Endpoint, | |
| message: bytes, | |
| timeout: float, | |
| username: str | None, | |
| password: str, | |
| verbose: bool, | |
| tcp_half_close: bool, | |
| ) -> str: | |
| with socks5_connect(proxy, endpoint.host, endpoint.port, timeout, username, password, verbose) as upstream: | |
| upstream.sendall(message) | |
| log_bytes(verbose, f"endpoint > tcp payload to {endpoint.display()}", message) | |
| if tcp_half_close: | |
| if verbose: | |
| print("TCP > shutdown(SHUT_WR)", file=sys.stderr) | |
| upstream.shutdown(socket.SHUT_WR) | |
| chunks: list[bytes] = [] | |
| received_any = False | |
| while True: | |
| try: | |
| chunk = upstream.recv(4096) | |
| if not chunk: | |
| break | |
| chunks.append(chunk) | |
| received_any = True | |
| upstream.settimeout(TCP_IDLE_TIMEOUT_S) | |
| except socket.timeout as exc: | |
| if received_any: | |
| break | |
| raise RuntimeError("Timed out waiting for the first TCP response byte.") from exc | |
| if not received_any: | |
| raise RuntimeError("No TCP response received.") | |
| response_bytes = b"".join(chunks) | |
| log_bytes(verbose, f"endpoint < tcp payload from {endpoint.display()}", response_bytes) | |
| return format_json_bytes(response_bytes) | |
| def request_udp_via_socks5( | |
| proxy: HostPort, | |
| endpoint: Endpoint, | |
| message: bytes, | |
| timeout: float, | |
| username: str | None, | |
| password: str, | |
| verbose: bool, | |
| ) -> str: | |
| control_sock = open_socks5_control_connection(proxy, timeout, username, password, verbose) | |
| try: | |
| wildcard_host = "::" if control_sock.family == socket.AF_INET6 else "0.0.0.0" | |
| relay = socks5_command( | |
| control_sock, | |
| SOCKS_CMD_UDP_ASSOCIATE, | |
| wildcard_host, | |
| 0, | |
| "UDP ASSOCIATE", | |
| verbose, | |
| ) | |
| relay = normalize_udp_relay(relay, control_sock) | |
| udp_sock, relay_sockaddr = open_udp_socket(relay, timeout) | |
| try: | |
| packet = build_socks5_udp_packet(endpoint.host, endpoint.port, message) | |
| log_bytes(verbose, f"SOCKS5/UDP > relay {relay.display()}", packet) | |
| udp_sock.sendto(packet, relay_sockaddr) | |
| response_packet, peer = udp_sock.recvfrom(UDP_MAX_DATAGRAM) | |
| log_bytes(verbose, f"SOCKS5/UDP < relay {peer[0]}:{peer[1]}", response_packet) | |
| _, _, payload = parse_socks5_udp_packet(response_packet) | |
| log_bytes(verbose, f"endpoint < udp payload from {endpoint.display()}", payload) | |
| return format_json_bytes(payload) | |
| finally: | |
| udp_sock.close() | |
| finally: | |
| control_sock.close() | |
| def request_http_via_socks5( | |
| proxy: HostPort, | |
| endpoint: Endpoint, | |
| message: bytes, | |
| timeout: float, | |
| username: str | None, | |
| password: str, | |
| verbose: bool, | |
| ) -> str: | |
| with socks5_connect(proxy, endpoint.host, endpoint.port, timeout, username, password, verbose) as upstream: | |
| request_bytes = ( | |
| f"POST / HTTP/1.1\r\n" | |
| f"Host: {endpoint.http_host_header()}\r\n" | |
| f"Accept: application/json\r\n" | |
| f"Content-Type: text/plain; charset=utf-8\r\n" | |
| f"Content-Length: {len(message)}\r\n" | |
| f"Connection: close\r\n" | |
| f"\r\n" | |
| ).encode("ascii") + message | |
| log_bytes(verbose, f"endpoint > http request to {endpoint.display()}", request_bytes) | |
| upstream.sendall(request_bytes) | |
| response = http.client.HTTPResponse(upstream) | |
| response.begin() | |
| body = response.read() | |
| if verbose: | |
| print( | |
| f"HTTP < {response.status} {response.reason} from {endpoint.display()}", | |
| file=sys.stderr, | |
| ) | |
| if not body: | |
| raise RuntimeError(f"HTTP {response.status} returned an empty response body.") | |
| log_bytes(verbose, f"endpoint < http body from {endpoint.display()}", body) | |
| return format_json_bytes(body) | |
| def open_socks5_control_connection( | |
| proxy: HostPort, | |
| timeout: float, | |
| username: str | None, | |
| password: str, | |
| verbose: bool, | |
| ) -> socket.socket: | |
| sock = socket.create_connection((proxy.host, proxy.port), timeout=timeout) | |
| sock.settimeout(timeout) | |
| socks5_negotiate_auth(sock, username, password, verbose) | |
| return sock | |
| def socks5_connect( | |
| proxy: HostPort, | |
| destination_host: str, | |
| destination_port: int, | |
| timeout: float, | |
| username: str | None, | |
| password: str, | |
| verbose: bool, | |
| ) -> socket.socket: | |
| sock = open_socks5_control_connection(proxy, timeout, username, password, verbose) | |
| try: | |
| bound = socks5_command( | |
| sock, | |
| SOCKS_CMD_CONNECT, | |
| destination_host, | |
| destination_port, | |
| "CONNECT", | |
| verbose, | |
| ) | |
| if verbose: | |
| print(f"SOCKS5 CONNECT established via {bound.display()}", file=sys.stderr) | |
| return sock | |
| except Exception: | |
| sock.close() | |
| raise | |
| def socks5_negotiate_auth( | |
| sock: socket.socket, | |
| username: str | None, | |
| password: str, | |
| verbose: bool, | |
| ) -> None: | |
| methods = [SOCKS_AUTH_NO_AUTH] | |
| if username is not None: | |
| methods.append(SOCKS_AUTH_USERNAME_PASSWORD) | |
| greeting = bytes([SOCKS_VERSION, len(methods), *methods]) | |
| send_frame(sock, greeting, "SOCKS5 > greeting", verbose) | |
| response = recv_exact(sock, 2) | |
| log_bytes(verbose, "SOCKS5 < method selection", response) | |
| if response[0] != SOCKS_VERSION: | |
| raise Socks5Error(f"Unexpected SOCKS version in greeting reply: {response[0]!r}") | |
| if response[1] == 0xFF: | |
| raise Socks5Error("SOCKS5 proxy rejected all advertised authentication methods.") | |
| if response[1] == SOCKS_AUTH_USERNAME_PASSWORD: | |
| if username is None: | |
| raise Socks5Error("SOCKS5 proxy requested username/password auth, but no username was provided.") | |
| negotiate_username_password(sock, username, password, verbose) | |
| elif response[1] != SOCKS_AUTH_NO_AUTH: | |
| raise Socks5Error(f"SOCKS5 proxy selected unsupported auth method 0x{response[1]:02x}.") | |
| def negotiate_username_password( | |
| sock: socket.socket, | |
| username: str, | |
| password: str, | |
| verbose: bool, | |
| ) -> None: | |
| username_bytes = username.encode("utf-8") | |
| password_bytes = password.encode("utf-8") | |
| if len(username_bytes) > 255 or len(password_bytes) > 255: | |
| raise ValueError("SOCKS5 username and password must each fit in 255 bytes.") | |
| request = ( | |
| b"\x01" | |
| + bytes([len(username_bytes)]) | |
| + username_bytes | |
| + bytes([len(password_bytes)]) | |
| + password_bytes | |
| ) | |
| send_frame(sock, request, "SOCKS5 > username/password auth", verbose) | |
| response = recv_exact(sock, 2) | |
| log_bytes(verbose, "SOCKS5 < username/password auth", response) | |
| if response[1] != 0x00: | |
| raise Socks5Error(f"SOCKS5 username/password auth failed with status 0x{response[1]:02x}.") | |
| def socks5_command( | |
| sock: socket.socket, | |
| command: int, | |
| destination_host: str, | |
| destination_port: int, | |
| label: str, | |
| verbose: bool, | |
| ) -> HostPort: | |
| allow_zero_port = command == SOCKS_CMD_UDP_ASSOCIATE and destination_port == 0 | |
| request = bytes([SOCKS_VERSION, command, 0x00]) + encode_socks5_address( | |
| destination_host, | |
| destination_port, | |
| allow_zero_port=allow_zero_port, | |
| ) | |
| send_frame(sock, request, f"SOCKS5 > {label}", verbose) | |
| response = read_socks5_reply(sock) | |
| log_bytes(verbose, f"SOCKS5 < {label}", response) | |
| reply_code = response[1] | |
| if reply_code != 0x00: | |
| message = SOCKS_REPLY_MESSAGES.get(reply_code, "unknown error") | |
| raise Socks5Error(f"SOCKS5 {label} failed with 0x{reply_code:02x}: {message}.") | |
| host, port, _ = decode_socks5_address(response, 3) | |
| return HostPort(host, port) | |
| def read_socks5_reply(sock: socket.socket) -> bytes: | |
| header = recv_exact(sock, 4) | |
| version, _reply, _reserved, atyp = header | |
| if version != SOCKS_VERSION: | |
| raise Socks5Error(f"Unexpected SOCKS version in command reply: {version!r}") | |
| if atyp == SOCKS_ATYP_IPV4: | |
| remainder = recv_exact(sock, 6) | |
| elif atyp == SOCKS_ATYP_IPV6: | |
| remainder = recv_exact(sock, 18) | |
| elif atyp == SOCKS_ATYP_DOMAIN: | |
| domain_length = recv_exact(sock, 1) | |
| remainder = domain_length + recv_exact(sock, domain_length[0] + 2) | |
| else: | |
| raise Socks5Error(f"Unsupported SOCKS5 address type in reply: 0x{atyp:02x}.") | |
| return header + remainder | |
| def open_udp_socket(relay: HostPort, timeout: float) -> tuple[socket.socket, tuple[object, ...]]: | |
| infos = socket.getaddrinfo(relay.host, relay.port, 0, socket.SOCK_DGRAM) | |
| family, socktype, proto, _canonname, sockaddr = infos[0] | |
| udp_sock = socket.socket(family, socktype, proto) | |
| udp_sock.settimeout(timeout) | |
| return udp_sock, sockaddr | |
| def normalize_udp_relay(relay: HostPort, control_sock: socket.socket) -> HostPort: | |
| if relay.host in {"0.0.0.0", "::"}: | |
| peer = control_sock.getpeername() | |
| return HostPort(peer[0], relay.port) | |
| return relay | |
| def build_socks5_udp_packet(destination_host: str, destination_port: int, payload: bytes) -> bytes: | |
| return b"\x00\x00\x00" + encode_socks5_address(destination_host, destination_port) + payload | |
| def parse_socks5_udp_packet(packet: bytes) -> tuple[str, int, bytes]: | |
| if len(packet) < 4: | |
| raise Socks5Error("SOCKS5 UDP reply was too short.") | |
| if packet[:2] != b"\x00\x00": | |
| raise Socks5Error("SOCKS5 UDP reply had a non-zero reserved field.") | |
| if packet[2] != 0x00: | |
| raise Socks5Error("SOCKS5 UDP fragmentation is not supported.") | |
| host, port, offset = decode_socks5_address(packet, 3) | |
| return host, port, packet[offset:] | |
| def encode_socks5_address(host: str, port: int, *, allow_zero_port: bool = False) -> bytes: | |
| validate_port(port, allow_zero_port=allow_zero_port) | |
| try: | |
| ip = ipaddress.ip_address(host) | |
| except ValueError: | |
| host_bytes = host.encode("idna") | |
| if len(host_bytes) > 255: | |
| raise ValueError("SOCKS5 domain names must fit in 255 bytes.") from None | |
| return bytes([SOCKS_ATYP_DOMAIN, len(host_bytes)]) + host_bytes + struct.pack("!H", port) | |
| if isinstance(ip, ipaddress.IPv4Address): | |
| return bytes([SOCKS_ATYP_IPV4]) + ip.packed + struct.pack("!H", port) | |
| return bytes([SOCKS_ATYP_IPV6]) + ip.packed + struct.pack("!H", port) | |
| def decode_socks5_address(buffer: bytes, offset: int) -> tuple[str, int, int]: | |
| atyp = buffer[offset] | |
| cursor = offset + 1 | |
| if atyp == SOCKS_ATYP_IPV4: | |
| end = cursor + 4 | |
| host = str(ipaddress.IPv4Address(buffer[cursor:end])) | |
| cursor = end | |
| elif atyp == SOCKS_ATYP_IPV6: | |
| end = cursor + 16 | |
| host = str(ipaddress.IPv6Address(buffer[cursor:end])) | |
| cursor = end | |
| elif atyp == SOCKS_ATYP_DOMAIN: | |
| length = buffer[cursor] | |
| cursor += 1 | |
| end = cursor + length | |
| host = buffer[cursor:end].decode("idna") | |
| cursor = end | |
| else: | |
| raise Socks5Error(f"Unsupported SOCKS5 address type 0x{atyp:02x}.") | |
| port = struct.unpack("!H", buffer[cursor:cursor + 2])[0] | |
| return host, port, cursor + 2 | |
| def recv_exact(sock: socket.socket, count: int) -> bytes: | |
| data = bytearray() | |
| while len(data) < count: | |
| chunk = sock.recv(count - len(data)) | |
| if not chunk: | |
| raise Socks5Error("Unexpected EOF while reading from SOCKS5 proxy.") | |
| data.extend(chunk) | |
| return bytes(data) | |
| def format_json_bytes(data: bytes) -> str: | |
| try: | |
| text = data.decode("utf-8") | |
| except UnicodeDecodeError as exc: | |
| raise RuntimeError("Response was not valid UTF-8.") from exc | |
| trimmed = text.strip() | |
| if not trimmed: | |
| raise RuntimeError("Response body was empty.") | |
| try: | |
| parsed = json.loads(trimmed) | |
| except json.JSONDecodeError as exc: | |
| raise RuntimeError("Response was not valid JSON.") from exc | |
| return json.dumps(parsed, indent=2, ensure_ascii=False) | |
| def parse_endpoint(spec: str) -> Endpoint: | |
| trimmed = spec.strip() | |
| first_separator = trimmed.find(":") | |
| last_separator = trimmed.rfind(":") | |
| if first_separator <= 0 or last_separator <= first_separator or last_separator >= len(trimmed) - 1: | |
| raise ValueError("Endpoint must use protocol:host:port format.") | |
| protocol = trimmed[:first_separator].lower() | |
| if protocol not in {"tcp", "udp", "http"}: | |
| raise ValueError("Endpoint protocol must be tcp, udp, or http.") | |
| host = trimmed[first_separator + 1:last_separator].strip() | |
| port_text = trimmed[last_separator + 1:].strip() | |
| host = normalize_host_literal(host, "Endpoint host") | |
| port = parse_port(port_text, "Endpoint port") | |
| return Endpoint(protocol, host, port) | |
| def parse_host_port(spec: str, label: str) -> HostPort: | |
| trimmed = spec.strip() | |
| if not trimmed: | |
| raise ValueError(f"{label} must not be empty.") | |
| if trimmed.startswith("["): | |
| end = trimmed.find("]") | |
| if end == -1 or end + 1 >= len(trimmed) or trimmed[end + 1] != ":": | |
| raise ValueError(f"{label} must use host:port form.") | |
| host = trimmed[1:end] | |
| port_text = trimmed[end + 2:] | |
| else: | |
| separator = trimmed.rfind(":") | |
| if separator <= 0 or separator >= len(trimmed) - 1: | |
| raise ValueError(f"{label} must use host:port form.") | |
| host = trimmed[:separator] | |
| port_text = trimmed[separator + 1:] | |
| host = normalize_host_literal(host, label) | |
| port = parse_port(port_text, f"{label} port") | |
| return HostPort(host, port) | |
| def normalize_host_literal(host: str, label: str) -> str: | |
| normalized = host.strip() | |
| if normalized.startswith("[") and normalized.endswith("]"): | |
| normalized = normalized[1:-1] | |
| if not normalized: | |
| raise ValueError(f"{label} must not be blank.") | |
| return normalized | |
| def parse_port(port_text: str, label: str) -> int: | |
| try: | |
| port = int(port_text, 10) | |
| except ValueError as exc: | |
| raise ValueError(f"{label} must be a decimal port number.") from exc | |
| validate_port(port) | |
| return port | |
| def validate_port(port: int, *, allow_zero_port: bool = False) -> None: | |
| if allow_zero_port and port == 0: | |
| return | |
| if not 1 <= port <= 65_535: | |
| raise ValueError("Port must be between 1 and 65535.") | |
| def send_frame(sock: socket.socket, data: bytes, label: str, verbose: bool) -> None: | |
| log_bytes(verbose, label, data) | |
| sock.sendall(data) | |
| def log_bytes(enabled: bool, label: str, data: bytes) -> None: | |
| if not enabled: | |
| return | |
| hex_bytes = " ".join(f"{byte:02x}" for byte in data) | |
| print(f"{label}: {hex_bytes}", file=sys.stderr) | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment