Gor*_*ler 3 python python-3.x python-asyncio python-3.6
如果我asyncio在顶级类中创建一系列任务,所有这些任务本质上都应该永远运行,如下所示:
self.event_loop.create_task(...)
self.event_loop.create_task(...)
self.event_loop.create_task(...)
...
self.event_loop.run_forever()
# Once we fall out of the event loop, collect all remaining tasks,
# cancel them, and terminate the asyncio event loop
tasks = asyncio.Task.all_tasks()
group = asyncio.gather(*tasks, return_exceptions=True)
group.cancel()
self.event_loop.run_until_complete(group)
self.event_loop.close()
Run Code Online (Sandbox Code Playgroud)
上面的代码没有处理以下情况,我发现我越来越需要这种情况,而且我还没有在谷歌搜索或asyncio文档中看到一个例子:
如果其中一项任务因异常而失败,则不会处理该异常 - 所有其他任务都会继续进行,但该任务只是静默停止(异常输出除外)。
那么,我该如何:
self.event_loop.create_task(...)再次运行,只是为了那个任务?这似乎需要找到在事件循环中收到异常的任务,将其删除,然后添加一个新任务 - 我不清楚如何做到这一点。未捕获的异常附加到任务对象,可以通过Task.exception()方法从中检索。该self.event_loop.create_task(...)调用返回任务对象,因此您需要收集这些对象以检查异常。
如果您想在发生异常时重新安排任务,那么您需要在新任务中执行此操作(因为您希望它在事件循环中运行),或者使用捕获异常的包装器协同例程并再次重新运行给定的协程。
后者可能类似于:
import traceback
def rerun_on_exception(coro, *args, **kwargs):
while True:
try:
await coro(*args, **kwargs)
except asyncio.CancelledError:
# don't interfere with cancellations
raise
except Exception:
print("Caught exception")
traceback.print_exc()
Run Code Online (Sandbox Code Playgroud)
然后在将它们作为任务调度时用上面的协程包装你的协程:
self.event_loop.create_task(rerun_on_exception(coroutine_uncalled, arg1value, ... kwarg1=value, ...)
Run Code Online (Sandbox Code Playgroud)
例如,每次出现异常时都传入参数以创建协程。
另一种选择是asyncio.wait()在单独的任务中使用,以便您可以在循环运行时监视异常,并决定如何在那里处理异常,然后:
def exception_aware_scheduler(*task_definitions, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
task_arguments = {
loop.create_task(coro(*args, **kwargs)): (coro, args, kwargs)
for coro, args, kwargs in task_definitions
}
while tasks:
done, pending = await asyncio.wait(
tasks.keys(), loop=loop, return_when=asyncio.FIRST_EXCEPTION
)
for task in done:
if task.exception() is not None:
print('Task exited with exception:')
task.print_stack()
print('Rescheduling the task\n')
coro, args, kwargs = tasks.pop(task)
tasks[loop.create_task(coro(*args, **kwargs))] = coro, args, kwargs
Run Code Online (Sandbox Code Playgroud)
该asyncio.wait()呼叫由事件循环再次获得控制时,当任务的任何一个安排的退出,由于异常,但直到发生这种情况,任务可能已被取消或简单地完成他们的工作。当任务因异常退出时,您需要一种方法来再次创建相同的协程(使用相同的参数),因此是*args, **kwargs上面的设置。
您只需安排exception_aware_scheduler(), 传入您想要传入的任务:
tasks = (
(coro1, (), {}), # no arguments
(coro2, ('arg1', 'arg2'), {}),
# ...
)
loop.create_task(exception_aware_scheduler(*tasks, loop=loop))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1247 次 |
| 最近记录: |