为什么 asyncio 任务在打开其中的连接时会被垃圾收集?

use*_*788 7 python python-asyncio

我正在创建一个服务器,它需要在响应时发出外部请求。为了处理并发请求,我使用 Python 的asyncio库。我遵循了标准库中的一些示例。然而,我的一些任务似乎被破坏了,打印Task was destroyed but it is pending!到我的终端。经过一些调试和研究后,我发现了一个 stackoverflow 答案,它似乎解释了原因。

我在下面创建了一个最小的示例来演示这种效果。我的问题是应该以什么方式抵消这种影响?存储对任务的硬引用(例如存储asyncio.current_task()在全局变量中)可以缓解该问题。如果我将 future 包装remote_read.read()为 ,它似乎也可以正常工作await asyncio.wait_for(remote_read.read(), 5)。但我确实觉得这些解决方案很丑陋。

# run and visit http://localhost:8080/ in your browser
import asyncio
import gc

async def client_connected_cb(reader, writer):
    remote_read, remote_write = await asyncio.open_connection("google.com", 443, ssl=True)
    await remote_read.read()

async def cleanup():
    while True:
        gc.collect()
        await asyncio.sleep(1)

async def main():
    server = await asyncio.start_server(client_connected_cb, "localhost", 8080)
    await asyncio.gather(server.serve_forever(), cleanup())

asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

我在 macOS 10.15.7 上运行 Python 3.10。

jsb*_*eno 2

目前看来,唯一的方法实际上是手动保留引用。

也许装饰器比必须在每个异步函数中手动添加代码更方便。我选择了类设计,以便类属性可以在任务运行时保存硬引用。(包装函数中的局部变量将成为任务引用循环的一部分,并且垃圾收集仍然会触发):


# run and visit http://localhost:8080/ in your browser
import asyncio
import gc
from functools import wraps
import weakref

class Shielded:
    registry = set()

    def __init__(self, func):
        self.func = func

    async def __call__(self, *args, **kw):
        self.registry.add(task:=asyncio.current_task())
        try:
            result = await self.func(*args, **kw)
        finally:
            self.registry.remove(task)
        return result

def _Shielded(func):
    # Used along with the print sequence to assert the task was actually destroyed without commenting
    async def wrapper(*args, **kwargs):
        ref = weakref.finalize(asyncio.current_task(), lambda: print("task destroyed"))
        return await func(*args, **kwargs)
    return wrapper

@Shielded
async def client_connected_cb(reader, writer):
    print("at task start")
    #registry.append(asyncio.current_task())
    # I've connected this to a socket in an interactive session, I'd explictly .close() for debugging:
    remote_read, remote_write = await asyncio.open_connection("localhost", 8060, ssl=False)
    print("comensing remote read")
    await remote_read.read()
    print("task complete")

async def cleanup():
    while True:
        gc.collect()
        await asyncio.sleep(1)

async def main():
    server = await asyncio.start_server(client_connected_cb, "localhost", 8080)
    await asyncio.gather(server.serve_forever(), cleanup())

asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

此外,我想“真正看到它”,所以我创建了一个“假”_Shielded 装饰器,它只会在底层任务被删除时记录一些内容:实际上,它永远不会打印“任务完成”。