有效使用多个 Asyncio 队列

DrS*_*och 3 python-3.x python-asyncio aiohttp

我目前正在构建一个项目,需要向各个端点发出多个请求。我将这些请求包装在 Aiohttp 中以允许异步。

问题:
我有三个队列:queue1queue2queue3。此外,我还有三个工作函数(worker1worker2worker3),它们与各自的队列关联。第一个队列立即填充运行前已知的列表 ID。当请求完成并将数据提交到数据库时,它将 ID 传递给queue2。Aworker2将获取此 ID 并请求更多数据。根据这些数据,它将开始生成 ID 列表(与 中的 ID 不同)queue1/queue2worker2会将 ID 放入 中queue3。最后,在提交到数据库之前,worker3将从中获取此 ID并请求更多数据。queue3

问题queue.join()是由于阻塞调用而出现的。每个工作线程都绑定到一个单独的队列,因此连接queue1将阻塞,直到完成。这很好,但它也违背了使用异步的目的。如果不使用join()该程序,则无法检测队列何时完全为空。另一个问题是,当其中一个队列为空但仍有数据尚未添加时,可能会出现静默错误。

基本代码概要如下:

queue1 = asyncio.Queue()
queue2 = asyncio.Queue()
queue3 = asyncio.Queue()

async with aiohttp.ClientSession() as session:
    for i in range(3):
        tasks.append(asyncio.create_task(worker1(queue1)))

    for i in range(3):
        tasks.append(asyncio.create_task(worker2(queue2)))

    for i in range(10):
        tasks.append(asyncio.create_task(worker3(queue3)))

    for i in IDs:
       queue1.put_nowait(i)

    await asyncio.gather(*tasks)
Run Code Online (Sandbox Code Playgroud)

工作函数处于无限循环中,等待项目进入队列。

当数据全部处理完后,将不会退出并且程序将挂起。

有没有办法有效管理工人并妥善结束?

use*_*342 5

正如这个答案中很好地解释的那样,Queue.join当注入队列的所有工作完成时,用于通知生产者。由于您的第一个队列不知道特定项目何时完成(它会相乘并分发到其他队列),因此join不是适合您的工具。

从您的代码来看,您的工作人员似乎只需要运行处理队列初始项目所需的时间。如果是这种情况,那么您可以使用关闭哨兵来通知工作人员退出。例如:

async with aiohttp.ClientSession() as session:

    # ... create tasks as above ...

    for i in IDs:
       queue1.put_nowait(i)
    queue1.put_nowait(None)  # no more work

    await asyncio.gather(*tasks)
Run Code Online (Sandbox Code Playgroud)

这就像您的原始代码,但具有明确的关闭请求。工作人员必须检测哨兵并做出相应反应:将其传播到下一个队列/工作人员并退出。例如,在worker1

while True:
    item = queue1.get()
    if item is None:
        # done with processing, propagate sentinel to worker2 and exit
        await queue2.put(None)
        break
    # ... process item as usual ...
Run Code Online (Sandbox Code Playgroud)

在其他两个工作线程中执行相同的操作(除了worker3不会传播的工作线程,因为没有下一个队列)将导致一旦工作完成,所有三个任务都会完成。由于队列是先进先出的,工作人员可以在遇到哨兵后安全退出,知道没有项目被丢弃。显式关闭还将关闭队列与恰好为空的队列区分开来,从而防止工作人员由于队列暂时为空而过早退出。

直到 Python 3.7,该技术实际上已在 的文档中进行了演示Queue,但该示例有点令人困惑地显示了关闭Queue.join哨兵的使用和使用。两者是分开的,可以独立使用。(同时使用它们也可能是有意义的,例如用于Queue.join等待“里程碑”,然后将其他东西放入队列中,同时保留哨兵来阻止工作人员。)