DrS*_*och 3 python-3.x python-asyncio aiohttp
我目前正在构建一个项目,需要向各个端点发出多个请求。我将这些请求包装在 Aiohttp 中以允许异步。
问题:
我有三个队列:queue1、queue2和queue3。此外,我还有三个工作函数(worker1、worker2、worker3),它们与各自的队列关联。第一个队列立即填充运行前已知的列表 ID。当请求完成并将数据提交到数据库时,它将 ID 传递给queue2。Aworker2将获取此 ID 并请求更多数据。根据这些数据,它将开始生成 ID 列表(与 中的 ID 不同)queue1/queue2。worker2会将 ID 放入 中queue3。最后,在提交到数据库之前,worker3将从中获取此 ID并请求更多数据。queue3
问题queue.join()是由于阻塞调用而出现的。每个工作线程都绑定到一个单独的队列,因此连接queue1将阻塞,直到完成。这很好,但它也违背了使用异步的目的。如果不使用join()该程序,则无法检测队列何时完全为空。另一个问题是,当其中一个队列为空但仍有数据尚未添加时,可能会出现静默错误。
基本代码概要如下:
queue1 = asyncio.Queue()
queue2 = asyncio.Queue()
queue3 = asyncio.Queue()
async with aiohttp.ClientSession() as session:
for i in range(3):
tasks.append(asyncio.create_task(worker1(queue1)))
for i in range(3):
tasks.append(asyncio.create_task(worker2(queue2)))
for i in range(10):
tasks.append(asyncio.create_task(worker3(queue3)))
for i in IDs:
queue1.put_nowait(i)
await asyncio.gather(*tasks)
Run Code Online (Sandbox Code Playgroud)
工作函数处于无限循环中,等待项目进入队列。
当数据全部处理完后,将不会退出并且程序将挂起。
有没有办法有效管理工人并妥善结束?
正如这个答案中很好地解释的那样,Queue.join当注入队列的所有工作完成时,用于通知生产者。由于您的第一个队列不知道特定项目何时完成(它会相乘并分发到其他队列),因此join不是适合您的工具。
从您的代码来看,您的工作人员似乎只需要运行处理队列初始项目所需的时间。如果是这种情况,那么您可以使用关闭哨兵来通知工作人员退出。例如:
async with aiohttp.ClientSession() as session:
# ... create tasks as above ...
for i in IDs:
queue1.put_nowait(i)
queue1.put_nowait(None) # no more work
await asyncio.gather(*tasks)
Run Code Online (Sandbox Code Playgroud)
这就像您的原始代码,但具有明确的关闭请求。工作人员必须检测哨兵并做出相应反应:将其传播到下一个队列/工作人员并退出。例如,在worker1:
while True:
item = queue1.get()
if item is None:
# done with processing, propagate sentinel to worker2 and exit
await queue2.put(None)
break
# ... process item as usual ...
Run Code Online (Sandbox Code Playgroud)
在其他两个工作线程中执行相同的操作(除了worker3不会传播的工作线程,因为没有下一个队列)将导致一旦工作完成,所有三个任务都会完成。由于队列是先进先出的,工作人员可以在遇到哨兵后安全退出,知道没有项目被丢弃。显式关闭还将关闭队列与恰好为空的队列区分开来,从而防止工作人员由于队列暂时为空而过早退出。
直到 Python 3.7,该技术实际上已在 的文档中进行了演示Queue,但该示例有点令人困惑地显示了关闭Queue.join哨兵的使用和使用。两者是分开的,可以独立使用。(同时使用它们也可能是有意义的,例如用于Queue.join等待“里程碑”,然后将其他东西放入队列中,同时保留哨兵来阻止工作人员。)
| 归档时间: |
|
| 查看次数: |
2594 次 |
| 最近记录: |