asyncio.DatagramProtocol 收到错误后停止响应
Python 官方文档提供了一个使用 asyncio 创建 UDP Echo Server 的示例,代码如下:
pythonimport asyncio
class EchoServerProtocol:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
print('Received %r from %s' % (message, addr))
print('Send %r to %s' % (message, addr))
self.transport.sendto(data, addr)
async def main():
print("Starting UDP server")
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
# One protocol instance will be created to serve all
# client requests.
transport, protocol = await loop.create_datagram_endpoint(
lambda: EchoServerProtocol(),
local_addr=('127.0.0.1', 9999))
try:
await asyncio.sleep(3600) # Serve for 1 hour.
finally:
transport.close()
asyncio.run(main())
然而,一旦收到运行时错误,该 UDP Server 便会失去响应,无法继续工作。通常来说,即使目标地址和端口未开启,发送 UDP 数据包也不会引发异常。但是在 Windows 系统上,如果向本地未开启监听的端口发送 UDP 数据包,会引起一个代码为 0x4D2 的 ERROR_PORT_UNREACHABLE 错误,其描述为「没有任何服务正在远程系统上的目标网络终结点上操作。( No service is operating at the destination network endpoint on the remote system. )」。在 GitHub 上已经有人提交了编号为 #91227 的 issue ,标题是「 asyncio proactor udp transport stops responding after send to port that isn’t listening 」。可以通过代码来复现这个问题。
服务端( server.py )代码:
pythonimport asyncio
async def main():
class DatagramProtocol(asyncio.DatagramProtocol):
def datagram_received(self, data, addr):
print('Received %r from %s' % (data.decode(), addr))
asyncio.get_running_loop().\
call_later(2, lambda: transport.sendto(data, addr))
def error_received(self, exc):
print('Received error:', exc)
local_addr = ('127.0.0.1', 9999)
loop = asyncio.get_running_loop()
transport, _ = await loop.create_datagram_endpoint(
lambda: DatagramProtocol(),
local_addr=local_addr
)
try:
await asyncio.sleep(3600)
finally:
transport.close()
asyncio.run(main())
客户端( client.py )代码:
pythonimport socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.sendto(b'test data', ('127.0.0.1', 9999))
s.close()
先运行服务端脚本,再执行客户端脚本。服务端监听本地 9999 端口,客户端向该端口发送一个 UDP 数据包后立即关闭 socket 并退出。当服务端接收到数据包后,延迟 2 秒向客户端源端口发送响应数据包,而此时客户端的 socket 已经关闭,导致 ERROR_PORT_UNREACHABLE 错误。服务端输出:
Received error: [WinError 1234] 没有任何服务正在远程系统上的目标网络终结点上操作。
错误发生后,虽然服务端仍在监听本地的 9999 端口,但是向该端口继续发送数据,服务端已经不再有响应。
根据官方文档可得知,在 Windows 平台上的默认事件循环为 ProactorEventLoop 。打开 Python 安装路径下的 Lib/asyncio/proactor_events.py 文件,找到 _ProactorDatagramTransport
类,这里面就是 asyncio.DatagramTransport
的具体实现代码。定位到文件的第 539 行:
python def _loop_writing(self, fut=None):
try:
if self._conn_lost:
return
assert fut is self._write_fut
self._write_fut = None
if fut:
# We are in a _loop_writing() done callback, get the result
fut.result()
if not self._buffer or (self._conn_lost and self._address):
# The connection has been closed
if self._closing:
self._loop.call_soon(self._call_connection_lost, None)
return
data, addr = self._buffer.popleft()
self._buffer_size -= len(data)
if self._address is not None:
self._write_fut = self._loop._proactor.send(self._sock,
data)
else:
self._write_fut = self._loop._proactor.sendto(self._sock,
data,
addr=addr)
except OSError as exc:
self._protocol.error_received(exc)
except Exception as exc:
self._fatal_error(exc, 'Fatal write error on datagram transport')
else:
self._write_fut.add_done_callback(self._loop_writing)
self._maybe_resume_protocol()
当捕获到 OSError 异常时,会调用 DatagramProtocol.error_received()
,但是却没有执行第 544 和 545 行的代码。这也就是服务端不再工作的原因。其实只要将这里的代码稍作修改就能修复这个 BUG 了,但是要等官方修复。作为普通用户,在不改动 Python 内部代码的前提下,暂时有两种修复方案。
方案一
方案一是将默认的事件循环设置成 SelectorEventLoop :
pythonimport asyncio
import selectors
class MyPolicy(asyncio.DefaultEventLoopPolicy):
def new_event_loop(self):
selector = selectors.SelectSelector()
return asyncio.SelectorEventLoop(selector)
asyncio.set_event_loop_policy(MyPolicy())
但 SelectorEventLoop 在 Windows 平台上无法利用高性能的 IOCP ,因此性能低下。这显然不是我想要的解决方案。
方案二
方案二是当发生错误时,将服务端重置:
pythonimport asyncio
async def main():
class DatagramProtocol(asyncio.DatagramProtocol):
def connection_made(self, transport):
self.transport = transport
self.need_restart = False
def datagram_received(self, data, addr):
print('Received %r from %s' % (data.decode(), addr))
asyncio.get_running_loop().\
call_later(2, lambda: self.transport.sendto(data, addr))
def error_received(self, exc):
print('Received error:', exc)
self.need_restart = True
self.transport.abort()
def connection_lost(self, exc):
if self.need_restart:
self.need_restart = False
print('Restarting server')
asyncio.create_task(restart())
async def restart():
nonlocal transport
transport, _ = await loop.create_datagram_endpoint(
lambda: DatagramProtocol(),
local_addr=local_addr
)
local_addr = ('127.0.0.1', 9999)
loop = asyncio.get_running_loop()
transport, _ = await loop.create_datagram_endpoint(
lambda: DatagramProtocol(),
local_addr=local_addr
)
try:
await asyncio.sleep(3600)
finally:
transport.close()
asyncio.run(main())
此方案虽然不够优雅,但也可堪一用。另外,也希望官方能早日修复这个 BUG 。