捕获单个任务中的异常并重新启动它们

Gor*_*ler 3 python python-3.x python-asyncio python-3.6

如果我asyncio在顶级类中创建一系列任务,所有这些任务本质上都应该永远运行,如下所示:

self.event_loop.create_task(...)
self.event_loop.create_task(...)
self.event_loop.create_task(...)
...

self.event_loop.run_forever()

# Once we fall out of the event loop, collect all remaining tasks,
# cancel them, and terminate the asyncio event loop
tasks = asyncio.Task.all_tasks()
group = asyncio.gather(*tasks, return_exceptions=True)
group.cancel()
self.event_loop.run_until_complete(group)
self.event_loop.close()
Run Code Online (Sandbox Code Playgroud)

上面的代码没有处理以下情况,我发现我越来越需要这种情况,而且我还没有在谷歌搜索或asyncio文档中看到一个例子:

如果其中一项任务因异常而失败,则不会处理该异常 - 所有其他任务都会继续进行,但该任务只是静默停止(异常输出除外)。

那么,我该如何:

  • 设置要处理的异常,以便失败不再沉默
  • 最重要的是,重新启动失败的任务,有效地self.event_loop.create_task(...)再次运行,只是为了那个任务?这似乎需要找到在事件循环中收到异常的任务,将其删除,然后添加一个新任务 - 我不清楚如何做到这一点。
  • 允许没有问题的任务不间断地继续。希望避免处理收到异常的任务的任何副作用。

Mar*_*ers 8

未捕获的异常附加到任务对象,可以通过Task.exception()方法从中检索。该self.event_loop.create_task(...)调用返回任务对象,因此您需要收集这些对象以检查异常。

如果您想在发生异常时重新安排任务,那么您需要在新任务中执行此操作(因为您希望它在事件循环中运行),或者使用捕获异常的包装器协同例程并再次重新运行给定的协程。

后者可能类似于:

import traceback

def rerun_on_exception(coro, *args, **kwargs):
    while True:
        try:
            await coro(*args, **kwargs)
        except asyncio.CancelledError:
            # don't interfere with cancellations
            raise
        except Exception:
            print("Caught exception")
            traceback.print_exc()
Run Code Online (Sandbox Code Playgroud)

然后在将它们作为任务调度时用上面的协程包装你的协程:

self.event_loop.create_task(rerun_on_exception(coroutine_uncalled, arg1value, ... kwarg1=value, ...)
Run Code Online (Sandbox Code Playgroud)

例如,每次出现异常时都传入参数以创建协程。

另一种选择是asyncio.wait()在单独的任务中使用,以便您可以在循环运行时监视异常,并决定如何在那里处理异常,然后:

def exception_aware_scheduler(*task_definitions, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()

    task_arguments = {
        loop.create_task(coro(*args, **kwargs)): (coro, args, kwargs)
        for coro, args, kwargs in task_definitions
    }
    while tasks:
        done, pending = await asyncio.wait(
            tasks.keys(), loop=loop, return_when=asyncio.FIRST_EXCEPTION
        )
        for task in done:
            if task.exception() is not None:
                print('Task exited with exception:')
                task.print_stack()
                print('Rescheduling the task\n')
                coro, args, kwargs = tasks.pop(task)
                tasks[loop.create_task(coro(*args, **kwargs))] = coro, args, kwargs
Run Code Online (Sandbox Code Playgroud)

asyncio.wait()呼叫由事件循环再次获得控制时,当任务的任何一个安排的退出,由于异常,但直到发生这种情况,任务可能已被取消或简单地完成他们的工作。当任务因异常退出时,您需要一种方法来再次创建相同的协程(使用相同的参数),因此是*args, **kwargs上面的设置。

您只需安排exception_aware_scheduler(), 传入您想要传入的任务:

tasks = (
    (coro1, (), {}),  # no arguments
    (coro2, ('arg1', 'arg2'), {}),
    # ...
)
loop.create_task(exception_aware_scheduler(*tasks, loop=loop))
Run Code Online (Sandbox Code Playgroud)