使用asyncio.wait在任务异常后重试任务

Lem*_*nPi 6 python coroutine async-await python-asyncio

我有多个协程应同时运行,其中一些可能会引发异常。在这种情况下,应重新运行协程。我该如何完成?我正在尝试做的最小演示:

import asyncio
import time

t = time.time()


async def c1():
    print("finished c1 {}".format(time.time() - t))


async def c2():
    await asyncio.sleep(3)
    print("finished c2 {}".format(time.time() - t))


called = False


async def c3():
    global called
    # raises an exception the first time it's called
    if not called:
        called = True
        raise RuntimeError("c3 called the first time")
    print("finished c3 {}".format(time.time() - t))


async def run():
    pending = {c1(), c2(), c3()}

    num_times_called = 0
    while pending:
        num_times_called += 1
        print("{} times called with {} pending tasks: {}".format(num_times_called, len(pending), pending))

        finished, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_EXCEPTION)
        for task in finished:
            if task.exception():
                print("{} got an exception {}, retrying".format(task, task.exception()))
                pending.add(task)

        print("finished {}".format(finished))

    print("finished all {}".format(time.time() - t))


asyncio.get_event_loop().run_until_complete(run())
Run Code Online (Sandbox Code Playgroud)

c3()表示某些协程将失败,需要重新运行。演示的问题是完成的任务已完成并具有异常集,因此当我将其放回挂起的集时,下一个运行循环将立即退出而无需重新运行,c3()因为它已经完成。

有没有办法清除任务,使其c3()再次运行?我知道附加到任务的协程实例将无法再等待,否则我会得到

RuntimeError('cannot reuse already awaited coroutine',)

这意味着我必须手动管理从协程实例到生成它的协程的映射,然后使用task._coro- 检索失败的协程实例-是这样吗?

Lem*_*nPi 8

编辑:任务本身可能是地图中的关键,这更干净。

async def run():
    tasks = {asyncio.ensure_future(c()): c for c in (c1, c2, c3)}
    pending = set(tasks.keys())

    num_times_called = 0
    while pending:
        num_times_called += 1
        print("{} times called with {} pending tasks: {}".format(num_times_called, len(pending), pending))

        finished, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_EXCEPTION)
        for task in finished:
            if task.exception():
                print("{} got an exception {}, retrying".format(task, task.exception()))
                coro = tasks[task]
                new_task = asyncio.ensure_future(coro())
                tasks[new_task] = coro
                pending.add(new_task)

        print("finished {}".format(finished))

    print("finished all {}".format(time.time() - t))
Run Code Online (Sandbox Code Playgroud)

  • 使“coros”成为**任务**到协程函数的映射:“coros = {loop.create_task(c()): c for c in (c1, c2, c3)}”。然后你可以通过任务查找协程:`coro = coros[task]`。其他一切都应该按照编写的方式工作,并且代码将是干净的并且仅使用公共 API。 (3认同)
  • @user4815162342 谢谢,忘记了 asyncio.wait 也可以接受任务! (2认同)