wait queue.put(item) 到 asyncio.Queue 上似乎没有释放事件循环的控制

Pau*_*l O 3 python queue async-await python-asyncio

在这个简单的生产者/消费者示例中,就好像await queue.put(item)不释放事件循环以允许消费者运行直到完成。这会导致生产者将其所有项目放入队列中,然后消费者才能将其取出。

这是预期的吗?

await queue.put(item)如果我遵循with ,我就会得到我正在寻找的结果await asyncio.sleep(0)

然后,生产者将 1 个项目放入队列中,然后消费者从队列中取出 1 个项目。

我在 Python 3.6.8 和 3.7.2 中得到相同的结果。

import asyncio

async def produce(queue, n):
    for x in range(1, n + 1):
        print('producing {}/{}'.format(x, n))
        item = str(x)
        await queue.put(item)
        # await asyncio.sleep(0)
    await queue.put(None)

async def consume(queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print('consuming item {}...'.format(item))

loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
producer_coro = produce(queue, 10)
consumer_coro = consume(queue)
loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro))
loop.close()
Run Code Online (Sandbox Code Playgroud)

use*_*342 5

这会导致生产者将其所有项目放入队列中,然后消费者才能将其取出。这是预期的吗?

是的。问题是您的队列是无界的,因此将某些内容放入其中永远不会挂起生产者,因此永远不会屈服于其他协程。这同样适用于立即提供数据的所有等待,例如EOF 处的读取

如果生产者的循环包含另一个暂停源,例如等待实际输入(毕竟它必须从某个地方获取项目),那么这将导致它暂停并且问题不会立即引起注意。使用强制暂停asyncio.sleep(0)也可以,但它很脆弱,因为它依赖于单个暂停来运行消费者。情况可能并不总是如此,因为消费者本身可以等待队列以外的某些事件。

无界队列在某些情况下是有意义的,例如当队列预先填充了任务时,或者生产者的体系结构将项目数量限制在合理的数量时。但如果队列项是动态生成的,最好添加一个界限。该界限保证了生产者的背压,并确保它不会垄断事件循环。

  • @PaulO是的,这是正确的概括 - `await` 并不能保证屈服于事件循环,您可以通过在无限循环中等待无操作协程来轻松检查事件循环。(`asyncio.sleep(0)` 实际上是另一个方向的[一种特殊情况](https://github.com/python/asyncio/issues/284)。)只有当协程运行时,事件循环才会接管被强制挂起,因为等待的事物无法提供结果,例如,当等待从尚未准备好读取的套接字读取时,等等。有关更多详细信息,请参阅[此答案](/sf/answers/3417142361/)。 (2认同)