asyncio.DatagramProtocol 收到错误后停止响应

Python 官方文档提供了一个使用 asyncio 创建 UDP Echo Server 的示例,代码如下:

pythonimport asyncio


class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)


async def main():
    print("Starting UDP server")

    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    # One protocol instance will be created to serve all
    # client requests.
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoServerProtocol(),
        local_addr=('127.0.0.1', 9999))

    try:
        await asyncio.sleep(3600)  # Serve for 1 hour.
    finally:
        transport.close()


asyncio.run(main())

然而,一旦收到运行时错误,该 UDP Server 便会失去响应,无法继续工作。通常来说,即使目标地址和端口未开启,发送 UDP 数据包也不会引发异常。但是在 Windows 系统上,如果向本地未开启监听的端口发送 UDP 数据包,会引起一个代码为 0x4D2 的 ERROR_PORT_UNREACHABLE 错误,其描述为「没有任何服务正在远程系统上的目标网络终结点上操作。( No service is operating at the destination network endpoint on the remote system. )」。在 GitHub 上已经有人提交了编号为 #91227 的 issue ,标题是「 asyncio proactor udp transport stops responding after send to port that isn’t listening 」。可以通过代码来复现这个问题。

服务端( server.py )代码:

pythonimport asyncio


async def main():
    class DatagramProtocol(asyncio.DatagramProtocol):
        def datagram_received(self, data, addr):
            print('Received %r from %s' % (data.decode(), addr))
            asyncio.get_running_loop().\
                call_later(2, lambda: transport.sendto(data, addr))

        def error_received(self, exc):
            print('Received error:', exc)

    local_addr = ('127.0.0.1', 9999)
    loop = asyncio.get_running_loop()
    transport, _ = await loop.create_datagram_endpoint(
        lambda: DatagramProtocol(),
        local_addr=local_addr
    )
    try:
        await asyncio.sleep(3600)
    finally:
        transport.close()


asyncio.run(main())

客户端( client.py )代码:

pythonimport socket

s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.sendto(b'test data', ('127.0.0.1', 9999))
s.close()

先运行服务端脚本,再执行客户端脚本。服务端监听本地 9999 端口,客户端向该端口发送一个 UDP 数据包后立即关闭 socket 并退出。当服务端接收到数据包后,延迟 2 秒向客户端源端口发送响应数据包,而此时客户端的 socket 已经关闭,导致 ERROR_PORT_UNREACHABLE 错误。服务端输出:

Received error: [WinError 1234] 没有任何服务正在远程系统上的目标网络终结点上操作。

错误发生后,虽然服务端仍在监听本地的 9999 端口,但是向该端口继续发送数据,服务端已经不再有响应。

根据官方文档可得知,在 Windows 平台上的默认事件循环为 ProactorEventLoop 。打开 Python 安装路径下的 Lib/asyncio/proactor_events.py 文件,找到 _ProactorDatagramTransport 类,这里面就是 asyncio.DatagramTransport 的具体实现代码。定位到文件的第 539 行

python    def _loop_writing(self, fut=None):
        try:
            if self._conn_lost:
                return

            assert fut is self._write_fut
            self._write_fut = None
            if fut:
                # We are in a _loop_writing() done callback, get the result
                fut.result()

            if not self._buffer or (self._conn_lost and self._address):
                # The connection has been closed
                if self._closing:
                    self._loop.call_soon(self._call_connection_lost, None)
                return

            data, addr = self._buffer.popleft()
            self._buffer_size -= len(data)
            if self._address is not None:
                self._write_fut = self._loop._proactor.send(self._sock,
                                                            data)
            else:
                self._write_fut = self._loop._proactor.sendto(self._sock,
                                                              data,
                                                              addr=addr)
        except OSError as exc:
            self._protocol.error_received(exc)
        except Exception as exc:
            self._fatal_error(exc, 'Fatal write error on datagram transport')
        else:
            self._write_fut.add_done_callback(self._loop_writing)
            self._maybe_resume_protocol()

当捕获到 OSError 异常时,会调用 DatagramProtocol.error_received() ,但是却没有执行第 544 和 545 行的代码。这也就是服务端不再工作的原因。其实只要将这里的代码稍作修改就能修复这个 BUG 了,但是要等官方修复。作为普通用户,在不改动 Python 内部代码的前提下,暂时有两种修复方案。

方案一

方案一是将默认的事件循环设置成 SelectorEventLoop

pythonimport asyncio
import selectors

class MyPolicy(asyncio.DefaultEventLoopPolicy):
   def new_event_loop(self):
      selector = selectors.SelectSelector()
      return asyncio.SelectorEventLoop(selector)

asyncio.set_event_loop_policy(MyPolicy())

但 SelectorEventLoop 在 Windows 平台上无法利用高性能的 IOCP ,因此性能低下。这显然不是我想要的解决方案。

方案二

方案二是当发生错误时,将服务端重置:

pythonimport asyncio
 
 
async def main():
    class DatagramProtocol(asyncio.DatagramProtocol):
        def connection_made(self, transport):
            self.transport = transport
            self.need_restart = False

        def datagram_received(self, data, addr):
            print('Received %r from %s' % (data.decode(), addr))
            asyncio.get_running_loop().\
                call_later(2, lambda: self.transport.sendto(data, addr))
 
        def error_received(self, exc):
            print('Received error:', exc)
            self.need_restart = True
            self.transport.abort()
        
        def connection_lost(self, exc):
            if self.need_restart:
                self.need_restart = False
                print('Restarting server')
                asyncio.create_task(restart())
 
    async def restart():
        nonlocal transport
        transport, _ = await loop.create_datagram_endpoint(
            lambda: DatagramProtocol(),
            local_addr=local_addr
        )

    local_addr = ('127.0.0.1', 9999)
    loop = asyncio.get_running_loop()
    transport, _ = await loop.create_datagram_endpoint(
        lambda: DatagramProtocol(),
        local_addr=local_addr
    )
    try:
        await asyncio.sleep(3600)
    finally:
        transport.close()
 
 
asyncio.run(main())

此方案虽然不够优雅,但也可堪一用。另外,也希望官方能早日修复这个 BUG 。