处理异步死锁

cru*_*rky 5 python-asyncio

此示例代码无限期挂起:

import asyncio


async def main():
    async def f():
        await g_task

    async def g():
        await f_task

    f_task = asyncio.create_task(f())
    g_task = asyncio.create_task(g())
    await f_task


asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

我正在寻找一种自动检测和处理死锁的方法,就像 GoLang 那样。

到目前为止,我想出了一个变体asyncio.wait_for()

[编辑] 大修设计

https://gist.github.com/gimperiale/549cbad04c24d870145d3f38fbb8e6f0

原始代码中的 1 行更改:

await wait_check_deadlock(f_task)
Run Code Online (Sandbox Code Playgroud)

它有效,但有两个主要问题:

  1. 它依赖于asyncio.Task._fut_waiter,这是 CPython 的一个实现细节
  2. 死锁的任务将永远保留在 RAM 中。aw.cancel()似乎什么都不做。如果我捕捉到辅助函数引发的 RecursionError,asyncio.run() 在尝试取消所有任务时会引发另一个 RecursionError。

是否有更强大的解决方案?

Dim*_*nek 3

死锁避免已经被研究了很多,存在一些实用的解决方案,但在一般情况下,问题是不可判定的(我认为它可以简化为停机问题)。

为了说明实用性,请考虑以下内容:

await asyncio.sleep(2 ** (1 / random.random()))
Run Code Online (Sandbox Code Playgroud)

根据您的运气,它要么很快就会回来,要么“几乎永远不会”。

这个技巧可以用来表明基于回调的程序是无法预测的:

f = asyncio.Future()

async foo():
    await asyncio.sleep(2 ** (1 / random.random()))
    f.set_result(None)

async bar():
    await f

await asyncio.gather(foo(), bar())
Run Code Online (Sandbox Code Playgroud)

同样,它可以应用于您的“纯”异步/等待程序:

async def f():
    await g_task

async def g():
    await asyncio.wait(f_task,
                       asyncio.sleep(2 ** (1 / random.random())),
                       return_when=asyncio.FIRST_COMPLETED)

f_task = asyncio.create_task(f())
g_task = asyncio.create_task(g())
await f_task
Run Code Online (Sandbox Code Playgroud)

同时,不完美但实用的死锁检测器可能非常有用,请考虑将您的代码发布到核心 asyncio 开发人员和/或独立库。

当前的做法是运行测试,PYTHONASYNCIODEBUG=1显示未等待的任务(在读取结果/异常之前销毁)。

您的库可能会更好,例如,它可以报告某些任务何时花费超过 X 的时间,或者何时取决于给定任务的任务 DAG 变得太大。

  • 我不会将过长的睡眠视为死锁 - 就像我不认为当没有客户端连接且服务器只是等待 select() 时 Web/RPC 服务器会被卡住一样。死锁具体是任务之间的循环依赖。 (3认同)