快速了解 SOCKS5 代理协议

SOCKS5 是最常见的代理服务协议,服务通常使用 1080 端口,支持代理 TCP/UDP 网络协议。协议由 RFC 1928 定义,也可以阅读非官方翻译的中文版。本文主要用于快速入门,省略了协议中不常用的部分。文中提供了协议的部分 Python 代码实现。

sequenceDiagram title 建立 SOCKS5 代理的流程 participant client as 客户端 participant proxy as SOCKS5 代理 participant dest as 目标服务器 client->>proxy: 连接代理服务器 proxy-->>client: 连接成功 client->>proxy: 协商认证方式 proxy-->>client: 确认认证方式 opt 需要身份验证 client->>proxy: 身份验证信息 proxy-->>client: 验证通过 end client->>proxy: 发送代理请求 proxy->>+dest: 连接目标服务器 dest-->>proxy: 连接成功 activate proxy proxy-->>client: 请求成功 loop 开始通信 client->>proxy: 发送数据 proxy->>dest: 发送数据 dest-->>proxy: 接收数据 proxy-->>client: 接收数据 end deactivate proxy deactivate dest

连接 SOCKS5 代理服务的 Python 代码如下:

pythonimport ipaddress
import socket
import struct

proxy_host = 'localhost'
proxy_port= 1080
proxy_username = ''
proxy_password = ''
proxy_version = 0x05

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((proxy_host, proxy_port))

协商阶段

客户端与 SOCKS5 代理服务器建立 TCP 连接后,首先需要进行身份验证协商,客户端和服务端就身份验证的方法达成一致。

首先,客户端向服务器发送一条包含协议版本号和可选验证方法的消息:

表1
字段描述类型长度例值
VER协议版本号unsigned char10x05
NMETHODS客户端支持的方法数量
决定 METHODS 的长度
unsigned char11 - 255
METHODS客户端支持的方法列表
一个字节对应一个方法
unsigned char []可变长度
1-255
0x02 用户名密码验证

发送消息的 Python 代码实现如下:

pythonclient.send(struct.pack('!3B', proxy_version, 1, 0x02))

服务器会在客户端支持的方法中选择一个,并返回消息:

表2
字段描述类型长度例值
VER协议版本号unsigned char10x05
METHOD服务端选择的可用方法unsigned char10x00 不需要身份验证
0x02 用户名密码验证
0xFF 无可接受的方法

身份验证方法( METHOD )的全部可选值如下:

虽然 RFC 要求符合规范的 SOCKS5 协议必须实现 GSSAPI 和用户名密码验证,但是通常只要实现后者即可。如果服务器返回消息中的 METHOD 值为 0xFF ,则意味着协商失败。解析服务器消息的 Python 代码如下:

pythonver, method = struct.unpack('!2B', client.recv(2))

if ver != proxy_version:
    client.close()
    raise ConnectionError('协议版本不匹配')

if method == 0xFF:
    client.close()
    raise ConnectionError('协商失败:代理服务器无可接受的方法')

子协商

如果服务器返回的方法不为 0x00 ,则需要进入子协商阶段,即身份验证阶段。本文实现的客户端仅支持用户名密码的验证方式。客户端向服务端发送一条包含用户名和密码的消息:

表3
字段描述类型长度例值
VER协议版本号unsigned char10x05
ULEN用户名长度unsigned char1
UNAME用户名unsigned char []可变长度
1-255
PLEN密码长度unsigned char1
PASSWD密码unsigned char []可变长度
1-255

发送消息的 Python 代码实现如下:

pythonraw_username = proxy_username.encode()
raw_password = proxy_password.encode()
username_len = len(raw_username)
password_len = len(raw_password)

client.send(struct.pack(
    '!2B%dsB%ds' % (username_len, password_len), 
    proxy_version, 
    username_len,
    raw_username,
    password_len,
    raw_password
))

服务器收到消息后返回验证结果:

表4
字段描述类型长度例值
VER协议版本号unsigned char10x05
STATUS验证结果unsigned char10x00 成功

解析服务器消息的 Python 代码如下:

pythonver, status = struct.unpack('!2B', client.recv(2))

if status != 0x00:
    client.close()
    raise ConnectionError('身份验证失败')

代理请求

协商验证通过后,客户端向服务器发送代理请求消息:

表5
字段描述类型长度例值
VER协议版本号unsigned char10x05
CMD命令类型unsigned char10x01 CONNECT
0x02 BIND
0x03 UDP ASSOCIATE
RSV保留字段unsigned char10x00
ATYP目标地址类型unsigned char10x01 IPv4
0x03 域名
0x04 IPv6
DST.ADDR目标地址unsigned char []可变长度
4 (IPv4)
16 (IPv6)
域名另见下表
DST.PORT目标端口unsigned short2

如果 ATYP 字段值是 0x03,则 DST.ADDR 的格式为:

表6
字段描述类型长度
DLEN域名长度unsigned char1
DOMAIN域名unsigned char []可变长度
1-255

CMD 字段的三个值分别表示:

通常只用到 CONNECT 和 UDP ASSOCIATE 请求。如果 SOCKS5 代理服务器具有公网 IP 地址,则可以通过 BIND 请求实现内网穿透。

发送 CONNECT 请求的 Python 代码实现如下:

pythondst_addr = 'www.baidu.com'
dst_port = 443
cmd_connect = 0x01
atyp_ipv4 = 0x01
atyp_ipv6 = 0x04
atyp_domain = 0x03

try:
    ip_addr = ipaddress.ip_address(dst_addr)
except ValueError:
    # Domain
    raw_dst_addr = dst_addr.encode()
    dst_addr_len = len(raw_dst_addr)
    client.send(struct.pack(
        '!5B%dsH' % (dst_addr_len, ),
        proxy_version, cmd_connect, 0, atyp_domain, dst_addr_len,
        raw_dst_addr, dst_port
    ))
else:
    if isinstance(ip_addr, ipaddress.IPv4Address):
        client.send(struct.pack(
            '!4B4sH', 
            proxy_version, cmd_connect, 0, atyp_ipv4,
            ip_addr.packed, dst_port
        ))
    else:
        # IPv6
        client.send(struct.pack(
            '!4B16sH', 
            proxy_version, cmd_connect, 0, atyp_ipv6,
            ip_addr.packed, dst_port
        ))

服务器收到代理请求后会响应如下消息:

表7
字段描述类型长度例值
VER协议版本号unsigned char10x05
REP服务器应答unsigned char10x00 成功
RSV保留字段unsigned char10x00
ATYP目标地址类型unsigned char10x01 IPv4
0x04 IPv6
BND.ADDR绑定地址unsigned char []可变长度
4 (IPv4)
16 (IPv6)
BND.PORT绑定端口unsigned short2

服务器响应消息中的 REP 字段如果不为 0x00 ,则表示请求失败。不同值的具体含义如下:

解析服务器消息的 Python 代码如下:

pythonver, rep, _, atyp,  = struct.unpack('!4B', client.recv(4))

if rep != 0x00:
    client.close()
    raise ConnectionError('代理请求失败', rep)

if atyp == atyp_ipv4:
    bind_addr, bind_port = struct.unpack('!4sH', client.recv(6))
    bind_addr = str(ipaddress.IPv4Address(bind_addr))
elif atyp == atyp_ipv6:
    bind_addr, bind_port = struct.unpack('!16sH', client.recv(18))
    bind_addr = str(ipaddress.IPv6Address(bind_addr))
elif atyp == atyp_domain:
    bind_addr_len, = struct.unpack('!B', client.recv(1))
    bind_addr = client.recv(bind_addr_len).encode()
    bind_port, = struct.unpack('!H', client.recv(2))
else:
    raise ConnectionError('服务器响应未知的 ATYP', atyp)

代理中继

服务器返回的 BND.ADDR 和 BND.PORT 是 SOCKS5 代理中继服务器的地址和端口,通常返回的中继服务器即当前 SOCKS5 代理的地址和端口。如果 SOCKS5 代理是以多主机( multi-homed )方式部署的话,则返回的 BND.ADDR 和 BND.PORT 可能与当前代理服务器地址和端口不一致。

CONNECT

需要判断返回的中继地址和端口与当前连接的代理服务器一致。如果一致,则直接在当前会话开始代理流量的数据通信。

pythonif client.getpeername() != (bind_addr, bind_port):
    client.close()
    raise ConnectionError('SOCKS5 代理客户端不支持中继')

BIND

该命令的作用是告知代理服务器开启端口监听,接受来自目标地址的 TCP 连接,不过有些 SOCKS5 代理未实现此功能。

如果代理服务器接受该命令,会依次向客户端发送两条响应消息:当代理服务成功开启监听套接字后,向客户端发送第一条消息,其中 BND.ADDR 和 BND.PORT 是代理服务器监听的地址和端口;当远程主机成功连接到代理服务器后,客户端会收到第二条消息,其中 BND.ADDR 和 BND.PORT 是远程主机的地址和端口号。

UDP ASSOCIATE

在请求建立 UDP 关联(转发代理)时,如果还不确定目标地址,可以将 DST.ADDR 和 DST.PORT 都设为 0 。当发送 UDP 关联请求的 TCP 连接终止时,UDP 关联也一并终止。

大多数 SOCKS5 代理的实现并没有严格遵守 RFC 的规范。即便 TCP 连接断开后,服务器的 UDP 关联依旧会保持。甚至某些实现连协商握手的步骤都可以省略,客户端直接向 SOCKS5 代理的端口(通常此类服务器的 TCP 和 UDP 复用同一个端口)发送封装好的 UDP 数据包即可实现转发。

通过中继收发的 UDP 数据报都经过封装。封装格式如下:

表8
字段描述类型长度例值
RSV保留字段unsigned short20x0000
FRAG分片编号unsigned char10x00
ATYP目标地址类型unsigned char10x01 IPv4
0x03 域名
0x04 IPv6
DST.ADDR绑定地址unsigned char []可变长度
4 (IPv4)
16 (IPv6)
域名见前表
DST.PORT绑定端口unsigned short2
DATA载荷数据unsigned char []可变长度

其中 FRAG 字段用于数据分片重组,一般情况下用不到,设为 0 即可。

示例代码

使用 SOCKS5 代理访问 https://www.baidu.com/ 的 Python 示例:

pythonimport io
import ipaddress
import socket
import struct
import ssl

PROXY_ADDR = ('localhost', 1080)
PROXY_USERNAME = ''
PROXY_PASSWORD = ''
PROXY_VERSION = 0x05
METHOD_USERNAME_PASSWORD = 0x02
METHOD_NO_AUTHENTICATION_REQUIRED = 0x00
METHOD_NO_ACCEPTABLE_METHODS = 0xFF
SUCCESS = 0x00
DST_HOSTNAME = 'www.baidu.com'
DST_PORT = 443
CMD_CONNECT = 0x01
ATYP_IPV4 = 0x01
ATYP_DOMAIN = 0x03
HTTP_REQUEST = ('GET / HTTP/1.1\r\nHOST: %s\r\n\r\n' % (DST_HOSTNAME, )).encode()

socks5_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socks5_client.connect(PROXY_ADDR)
socks5_client.send(struct.pack('!3B', PROXY_VERSION, 1, METHOD_USERNAME_PASSWORD))

ver, method = struct.unpack('!2B', socks5_client.recv(2))
if ver != PROXY_VERSION or method == METHOD_NO_ACCEPTABLE_METHODS:
    socks5_client.close()
    raise ConnectionError()

if method == METHOD_USERNAME_PASSWORD:
    raw_username = PROXY_USERNAME.encode()
    raw_password = PROXY_PASSWORD.encode()
    username_len = len(raw_username)
    password_len = len(raw_password)
    socks5_client.send(struct.pack(
        '!2B%dsB%ds' % (username_len, password_len), 
        PROXY_VERSION, 
        username_len,
        raw_username,
        password_len,
        raw_password
    ))
    _, status = struct.unpack('!2B', socks5_client.recv(2))
    if status != SUCCESS:
        socks5_client.close()
        raise ConnectionError('身份验证失败')

raw_DST_HOSTNAME = DST_HOSTNAME.encode()
DST_HOSTNAME_len = len(raw_DST_HOSTNAME)
socks5_client.send(struct.pack(
    '!5B%dsH' % (DST_HOSTNAME_len, ),
    PROXY_VERSION, CMD_CONNECT, 0, ATYP_DOMAIN, DST_HOSTNAME_len,
    raw_DST_HOSTNAME, DST_PORT
))
_, rep, _, atyp, bind_addr, bind_port  = struct.unpack('!4B4sH', socks5_client.recv(10))
if rep != SUCCESS or atyp != ATYP_IPV4:
    socks5_client.close()
    raise ConnectionError()
bind_addr = str(ipaddress.IPv4Address(bind_addr))
if socks5_client.getpeername() != (bind_addr, bind_port):
    socks5_client.close()
    raise ConnectionError()

ssl_context = ssl.create_default_context()
with ssl_context.wrap_socket(socks5_client, server_hostname=DST_HOSTNAME) as https_client:
    https_client.write(HTTP_REQUEST)
    ln = https_client.makefile().readline()
    print(ln)

socks5_client.close()