如何将自定义异常抛出到正在运行的任务中

Jaa*_*rus 5 python python-3.x python-asyncio

我试图弄清楚是否有可能将自定义异常抛出到正在运行的 asyncio 任务中,类似于在底层协程中引发的Task.cancel(self)调度 a所实现的CancelledError

我遇到了Task.get_coro().throw(exc),但调用它似乎打开了一大罐蠕虫,因为我们可能会使任务处于不良状态。尤其是考虑到任务CancelledError投入协程时发生的所有机制。

考虑以下示例:

import asyncio

class Reset(Exception):
    pass

async def infinite():
    while True:
        try:
            print('work')
            await asyncio.sleep(1)
            print('more work')
        except Reset:
            print('reset')
            continue
        except asyncio.CancelledError:
            print('cancel')
            break

async def main():
    infinite_task = asyncio.create_task(infinite())
    await asyncio.sleep(0)  # Allow infinite_task to enter its work loop.
    infinite_task.get_coro().throw(Reset())
    await infinite_task

asyncio.run(main())

## OUTPUT ##
# "work"
# "reset"
# "work"
# hangs forever ... bad :(
Run Code Online (Sandbox Code Playgroud)

我尝试做的甚至可行吗?感觉好像我不应该像这样操纵底层协程。任何解决方法?

Mik*_*mov 3

无法将自定义异常抛出到正在运行的任务中。你不应该搞乱.throw- 这是实现的细节,改变它可能会破坏某些东西。

如果您想将信息(有关重置)传递到任务中,请通过参数来完成。下面是它的实现方法:

import asyncio
from contextlib import suppress


async def infinite(need_reset):
    try:
        while True:
            inner_task = asyncio.create_task(inner_job())

            await asyncio.wait(
                [
                    need_reset.wait(),
                    inner_task
                ], 
                return_when=asyncio.FIRST_COMPLETED
            )

            if need_reset.is_set():
                print('reset')
                await cancel(inner_task)
                need_reset.clear()
    except asyncio.CancelledError:
        print('cancel')
        raise  # you should never suppress, see:
               # /sf/answers/2350522541/


async def inner_job():
    print('work')
    await asyncio.sleep(1)
    print('more work')


async def cancel(task):
    # more info: /sf/answers/3066719071/
    task.cancel()
    with suppress(asyncio.CancelledError):
        await task


async def main():
    need_reset = asyncio.Event()
    infinite_task = asyncio.create_task(infinite(need_reset))

    await asyncio.sleep(1.5)
    need_reset.set()

    await asyncio.sleep(1.5)
    await cancel(infinite_task)


asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

输出:

work
more work
work
reset
work
more work
work
cancel
Run Code Online (Sandbox Code Playgroud)