快速了解 SOCKS5 代理协议
SOCKS5 是最常见的代理服务协议,服务通常使用 1080 端口,支持代理 TCP/UDP 网络协议。协议由 RFC 1928 定义,也可以阅读非官方翻译的中文版。本文主要用于快速入门,省略了协议中不常用的部分。文中提供了协议的部分 Python 代码实现。
连接 SOCKS5 代理服务的 Python 代码如下:
pythonimport ipaddress
import socket
import struct
proxy_host = 'localhost'
proxy_port= 1080
proxy_username = ''
proxy_password = ''
proxy_version = 0x05
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((proxy_host, proxy_port))
协商阶段
客户端与 SOCKS5 代理服务器建立 TCP 连接后,首先需要进行身份验证协商,客户端和服务端就身份验证的方法达成一致。
首先,客户端向服务器发送一条包含协议版本号和可选验证方法的消息:
字段 | 描述 | 类型 | 长度 | 例值 |
---|---|---|---|---|
VER | 协议版本号 | unsigned char | 1 | 0x05 |
NMETHODS | 客户端支持的方法数量 决定 METHODS 的长度 | unsigned char | 1 | 1 - 255 |
METHODS | 客户端支持的方法列表 一个字节对应一个方法 | unsigned char [] | 可变长度 1-255 | 0x02 用户名密码验证 |
发送消息的 Python 代码实现如下:
pythonclient.send(struct.pack('!3B', proxy_version, 1, 0x02))
服务器会在客户端支持的方法中选择一个,并返回消息:
字段 | 描述 | 类型 | 长度 | 例值 |
---|---|---|---|---|
VER | 协议版本号 | unsigned char | 1 | 0x05 |
METHOD | 服务端选择的可用方法 | unsigned char | 1 | 0x00 不需要身份验证 0x02 用户名密码验证 0xFF 无可接受的方法 |
身份验证方法( METHOD )的全部可选值如下:
- 0x00 不需要身份验证( NO AUTHENTICATION REQUIRED )
- 0x01 GSSAPI
- 0x02 用户名密码( USERNAME/PASSWORD )
- 0x03 至 0x7F 由 IANA 分配( IANA ASSIGNED )
- 0x80 至 0xFE 为私人方法保留( RESERVED FOR PRIVATE METHODS )
- 0xFF 无可接受的方法( NO ACCEPTABLE METHODS )
虽然 RFC 要求符合规范的 SOCKS5 协议必须实现 GSSAPI 和用户名密码验证,但是通常只要实现后者即可。如果服务器返回消息中的 METHOD 值为 0xFF ,则意味着协商失败。解析服务器消息的 Python 代码如下:
pythonver, method = struct.unpack('!2B', client.recv(2))
if ver != proxy_version:
client.close()
raise ConnectionError('协议版本不匹配')
if method == 0xFF:
client.close()
raise ConnectionError('协商失败:代理服务器无可接受的方法')
子协商
如果服务器返回的方法不为 0x00 ,则需要进入子协商阶段,即身份验证阶段。本文实现的客户端仅支持用户名密码的验证方式。客户端向服务端发送一条包含用户名和密码的消息:
字段 | 描述 | 类型 | 长度 | 例值 |
---|---|---|---|---|
VER | 协议版本号 | unsigned char | 1 | 0x05 |
ULEN | 用户名长度 | unsigned char | 1 | |
UNAME | 用户名 | unsigned char [] | 可变长度 1-255 | |
PLEN | 密码长度 | unsigned char | 1 | |
PASSWD | 密码 | unsigned char [] | 可变长度 1-255 |
发送消息的 Python 代码实现如下:
pythonraw_username = proxy_username.encode()
raw_password = proxy_password.encode()
username_len = len(raw_username)
password_len = len(raw_password)
client.send(struct.pack(
'!2B%dsB%ds' % (username_len, password_len),
proxy_version,
username_len,
raw_username,
password_len,
raw_password
))
服务器收到消息后返回验证结果:
字段 | 描述 | 类型 | 长度 | 例值 |
---|---|---|---|---|
VER | 协议版本号 | unsigned char | 1 | 0x05 |
STATUS | 验证结果 | unsigned char | 1 | 0x00 成功 |
解析服务器消息的 Python 代码如下:
pythonver, status = struct.unpack('!2B', client.recv(2))
if status != 0x00:
client.close()
raise ConnectionError('身份验证失败')
代理请求
协商验证通过后,客户端向服务器发送代理请求消息:
字段 | 描述 | 类型 | 长度 | 例值 |
---|---|---|---|---|
VER | 协议版本号 | unsigned char | 1 | 0x05 |
CMD | 命令类型 | unsigned char | 1 | 0x01 CONNECT 0x02 BIND 0x03 UDP ASSOCIATE |
RSV | 保留字段 | unsigned char | 1 | 0x00 |
ATYP | 目标地址类型 | unsigned char | 1 | 0x01 IPv4 0x03 域名 0x04 IPv6 |
DST.ADDR | 目标地址 | unsigned char [] | 可变长度 4 (IPv4) 16 (IPv6) 域名另见下表 | |
DST.PORT | 目标端口 | unsigned short | 2 |
如果 ATYP 字段值是 0x03,则 DST.ADDR 的格式为:
字段 | 描述 | 类型 | 长度 |
---|---|---|---|
DLEN | 域名长度 | unsigned char | 1 |
DOMAIN | 域名 | unsigned char [] | 可变长度 1-255 |
CMD 字段的三个值分别表示:
- CONNECT 代理 TCP 流量
- BIND 代理开启监听端口,接收目标地址的连接
- UDP ASSOCIATE 代理 UDP 数据转发
通常只用到 CONNECT 和 UDP ASSOCIATE 请求。如果 SOCKS5 代理服务器具有公网 IP 地址,则可以通过 BIND 请求实现内网穿透。
发送 CONNECT 请求的 Python 代码实现如下:
pythondst_addr = 'www.baidu.com'
dst_port = 443
cmd_connect = 0x01
atyp_ipv4 = 0x01
atyp_ipv6 = 0x04
atyp_domain = 0x03
try:
ip_addr = ipaddress.ip_address(dst_addr)
except ValueError:
# Domain
raw_dst_addr = dst_addr.encode()
dst_addr_len = len(raw_dst_addr)
client.send(struct.pack(
'!5B%dsH' % (dst_addr_len, ),
proxy_version, cmd_connect, 0, atyp_domain, dst_addr_len,
raw_dst_addr, dst_port
))
else:
if isinstance(ip_addr, ipaddress.IPv4Address):
client.send(struct.pack(
'!4B4sH',
proxy_version, cmd_connect, 0, atyp_ipv4,
ip_addr.packed, dst_port
))
else:
# IPv6
client.send(struct.pack(
'!4B16sH',
proxy_version, cmd_connect, 0, atyp_ipv6,
ip_addr.packed, dst_port
))
服务器收到代理请求后会响应如下消息:
字段 | 描述 | 类型 | 长度 | 例值 |
---|---|---|---|---|
VER | 协议版本号 | unsigned char | 1 | 0x05 |
REP | 服务器应答 | unsigned char | 1 | 0x00 成功 |
RSV | 保留字段 | unsigned char | 1 | 0x00 |
ATYP | 目标地址类型 | unsigned char | 1 | 0x01 IPv4 0x04 IPv6 |
BND.ADDR | 绑定地址 | unsigned char [] | 可变长度 4 (IPv4) 16 (IPv6) | |
BND.PORT | 绑定端口 | unsigned short | 2 |
服务器响应消息中的 REP 字段如果不为 0x00 ,则表示请求失败。不同值的具体含义如下:
- 0x00 成功
- 0x01 常规 SOCKS 服务器故障
- 0x02 规则不允许的链接
- 0x03 网络无法访问
- 0x04 主机无法访问
- 0x05 连接被拒绝
- 0x06 TTL 过期
- 0x07 不支持的命令
- 0x08 不支持的地址类型
解析服务器消息的 Python 代码如下:
pythonver, rep, _, atyp, = struct.unpack('!4B', client.recv(4))
if rep != 0x00:
client.close()
raise ConnectionError('代理请求失败', rep)
if atyp == atyp_ipv4:
bind_addr, bind_port = struct.unpack('!4sH', client.recv(6))
bind_addr = str(ipaddress.IPv4Address(bind_addr))
elif atyp == atyp_ipv6:
bind_addr, bind_port = struct.unpack('!16sH', client.recv(18))
bind_addr = str(ipaddress.IPv6Address(bind_addr))
elif atyp == atyp_domain:
bind_addr_len, = struct.unpack('!B', client.recv(1))
bind_addr = client.recv(bind_addr_len).encode()
bind_port, = struct.unpack('!H', client.recv(2))
else:
raise ConnectionError('服务器响应未知的 ATYP', atyp)
代理中继
服务器返回的 BND.ADDR 和 BND.PORT 是 SOCKS5 代理中继服务器的地址和端口,通常返回的中继服务器即当前 SOCKS5 代理的地址和端口。如果 SOCKS5 代理是以多主机( multi-homed )方式部署的话,则返回的 BND.ADDR 和 BND.PORT 可能与当前代理服务器地址和端口不一致。
CONNECT
需要判断返回的中继地址和端口与当前连接的代理服务器一致。如果一致,则直接在当前会话开始代理流量的数据通信。
pythonif client.getpeername() != (bind_addr, bind_port):
client.close()
raise ConnectionError('SOCKS5 代理客户端不支持中继')
BIND
该命令的作用是告知代理服务器开启端口监听,接受来自目标地址的 TCP 连接,不过有些 SOCKS5 代理未实现此功能。
如果代理服务器接受该命令,会依次向客户端发送两条响应消息:当代理服务成功开启监听套接字后,向客户端发送第一条消息,其中 BND.ADDR 和 BND.PORT 是代理服务器监听的地址和端口;当远程主机成功连接到代理服务器后,客户端会收到第二条消息,其中 BND.ADDR 和 BND.PORT 是远程主机的地址和端口号。
UDP ASSOCIATE
在请求建立 UDP 关联(转发代理)时,如果还不确定目标地址,可以将 DST.ADDR 和 DST.PORT 都设为 0 。当发送 UDP 关联请求的 TCP 连接终止时,UDP 关联也一并终止。
大多数 SOCKS5 代理的实现并没有严格遵守 RFC 的规范。即便 TCP 连接断开后,服务器的 UDP 关联依旧会保持。甚至某些实现连协商握手的步骤都可以省略,客户端直接向 SOCKS5 代理的端口(通常此类服务器的 TCP 和 UDP 复用同一个端口)发送封装好的 UDP 数据包即可实现转发。
通过中继收发的 UDP 数据报都经过封装。封装格式如下:
字段 | 描述 | 类型 | 长度 | 例值 |
---|---|---|---|---|
RSV | 保留字段 | unsigned short | 2 | 0x0000 |
FRAG | 分片编号 | unsigned char | 1 | 0x00 |
ATYP | 目标地址类型 | unsigned char | 1 | 0x01 IPv4 0x03 域名 0x04 IPv6 |
DST.ADDR | 绑定地址 | unsigned char [] | 可变长度 4 (IPv4) 16 (IPv6) 域名见前表 | |
DST.PORT | 绑定端口 | unsigned short | 2 | |
DATA | 载荷数据 | unsigned char [] | 可变长度 |
其中 FRAG 字段用于数据分片重组,一般情况下用不到,设为 0 即可。
示例代码
使用 SOCKS5 代理访问 https://www.baidu.com/ 的 Python 示例:
pythonimport io
import ipaddress
import socket
import struct
import ssl
PROXY_ADDR = ('localhost', 1080)
PROXY_USERNAME = ''
PROXY_PASSWORD = ''
PROXY_VERSION = 0x05
METHOD_USERNAME_PASSWORD = 0x02
METHOD_NO_AUTHENTICATION_REQUIRED = 0x00
METHOD_NO_ACCEPTABLE_METHODS = 0xFF
SUCCESS = 0x00
DST_HOSTNAME = 'www.baidu.com'
DST_PORT = 443
CMD_CONNECT = 0x01
ATYP_IPV4 = 0x01
ATYP_DOMAIN = 0x03
HTTP_REQUEST = ('GET / HTTP/1.1\r\nHOST: %s\r\n\r\n' % (DST_HOSTNAME, )).encode()
socks5_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socks5_client.connect(PROXY_ADDR)
socks5_client.send(struct.pack('!3B', PROXY_VERSION, 1, METHOD_USERNAME_PASSWORD))
ver, method = struct.unpack('!2B', socks5_client.recv(2))
if ver != PROXY_VERSION or method == METHOD_NO_ACCEPTABLE_METHODS:
socks5_client.close()
raise ConnectionError()
if method == METHOD_USERNAME_PASSWORD:
raw_username = PROXY_USERNAME.encode()
raw_password = PROXY_PASSWORD.encode()
username_len = len(raw_username)
password_len = len(raw_password)
socks5_client.send(struct.pack(
'!2B%dsB%ds' % (username_len, password_len),
PROXY_VERSION,
username_len,
raw_username,
password_len,
raw_password
))
_, status = struct.unpack('!2B', socks5_client.recv(2))
if status != SUCCESS:
socks5_client.close()
raise ConnectionError('身份验证失败')
raw_DST_HOSTNAME = DST_HOSTNAME.encode()
DST_HOSTNAME_len = len(raw_DST_HOSTNAME)
socks5_client.send(struct.pack(
'!5B%dsH' % (DST_HOSTNAME_len, ),
PROXY_VERSION, CMD_CONNECT, 0, ATYP_DOMAIN, DST_HOSTNAME_len,
raw_DST_HOSTNAME, DST_PORT
))
_, rep, _, atyp, bind_addr, bind_port = struct.unpack('!4B4sH', socks5_client.recv(10))
if rep != SUCCESS or atyp != ATYP_IPV4:
socks5_client.close()
raise ConnectionError()
bind_addr = str(ipaddress.IPv4Address(bind_addr))
if socks5_client.getpeername() != (bind_addr, bind_port):
socks5_client.close()
raise ConnectionError()
ssl_context = ssl.create_default_context()
with ssl_context.wrap_socket(socks5_client, server_hostname=DST_HOSTNAME) as https_client:
https_client.write(HTTP_REQUEST)
ln = https_client.makefile().readline()
print(ln)
socks5_client.close()