处理永远不会在 python3 asyncio 中终止的任务

zwo*_*wol 5 python-3.x python-asyncio

有时异步任务没有有意义的终止条件——例如,在下面的程序中,“rate_limiter”任务在队列上以固定速率永远生成令牌流。

import asyncio
import sys

@asyncio.coroutine
def rate_limiter(queue, rate):
    """Push tokens to QUEUE at a rate of RATE per second."""
    delay = 1/rate
    while True:
        yield from asyncio.sleep(delay)
        yield from queue.put(None)

@asyncio.coroutine
def do_work(n, rate):
    for i in range(n):
        yield from rate.get()
        sys.stdout.write("job {}\n".format(i))

def main():
    loop   = asyncio.get_event_loop()
    rate   = asyncio.Queue()
    rltask = loop.create_task(rate_limiter(rate, 10))
    wtask  = loop.create_task(do_work(20, rate))
    loop.run_until_complete(wtask)

main()
Run Code Online (Sandbox Code Playgroud)

这个程序完美运行,只是asyncio 库认为这是一个编程错误,rltask当没有任何东西可以限制速率时就扔掉它;你会收到这样的投诉

...
job 18
job 19
Task was destroyed but it is pending!
task: <Task pending coro=<rate_limiter() running at rl.py:9>
      wait_for=<Future pending cb=[Task._wakeup()]>>
Run Code Online (Sandbox Code Playgroud)

(无论是否处于调试模式)。

我可以用诸如告诉rate_limiter协程跳出循环的事件来解决这个问题,但这感觉就像额外的代码并没有真正的好处。在使用 asyncio 时,您应该如何处理这种情况?

编辑:我不清楚:我正在寻找的是类似于daemon线程标志的东西:使我不必等待特定任务的东西,理想情况下表示为任务本身或其协程的注释。我也会接受一个证明没有这种机制的答案。我已经知道解决方法。

jfs*_*jfs 2

避免出现“任务已销毁但正在等待处理!”的情况。警告,如果为相应的 future 对象设置了虚拟结果,则可以在退出程序时将永无止境的协程标记为已完成:

#!/usr/bin/env python3.5
import asyncio
import itertools
from contextlib import closing, contextmanager


@contextmanager
def finishing(coro_or_future, *, loop=None):
    """Mark a never ending coroutine or future as done on __exit__."""
    fut = asyncio.ensure_future(
        coro_or_future, loop=loop)  # start infinite loop
    try:
        yield
    finally:
        if not fut.cancelled():
            fut.set_result(None)  # mark as finished


async def never_ends():
    for c in itertools.cycle('\|/-'):
        print(c, end='\r', flush=True)
        await asyncio.sleep(.3)


with closing(asyncio.get_event_loop()) as loop, \
     finishing(never_ends(), loop=loop):
    loop.run_until_complete(asyncio.sleep(3))  # do something else
Run Code Online (Sandbox Code Playgroud)

它假设您的协程在进程退出之前不需要显式清理。在后一种情况下,定义一个显式的清理过程:提供可以调用的方法(例如,server.close()server.wait_closed()),或传递调用者应在关闭时设置的事件(asyncio.Event),或引发异常(例如CancelledError)。

引入的好处finishing()是检测错误,即,您不应该忽略警告,除非它被调用明确地静音finishing()