Abi*_*ics 0 python queue producer-consumer python-asyncio
我使用这个答案中的代码,但是asyncio.exceptions.CancelledError
当队列为空时得到。在实际项目中,我将任务添加到消费者的队列中,这就是我使用while True
语句的原因
我压缩该代码以使调试更容易:
import asyncio
import traceback
async def consumer(queue: asyncio.Queue):
try:
while True:
number = await queue.get() # here is exception
queue.task_done()
print(f'consumed {number}')
except BaseException:
traceback.print_exc()
async def main():
queue = asyncio.Queue()
for i in range(3):
await queue.put(i)
consumers = [asyncio.create_task(consumer(queue)) for _ in range(1)]
await queue.join()
for c in consumers:
c.cancel()
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
和错误:
consumed 0
consumed 1
consumed 2
Traceback (most recent call last):
File "/Users/abionics/Downloads/BaseAsyncScraper/ttt.py", line 8, in consumer
number = await queue.get()
File "/usr/local/Cellar/python@3.9/3.9.4/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/queues.py", line 166, in get
await getter
asyncio.exceptions.CancelledError
Run Code Online (Sandbox Code Playgroud)
顺便说一句, 的文档是queue.get()
这么说的If queue is empty, wait until an item is available
。这个错误的真正原因是什么?也许有更好的解决方案?
原因是因为你取消了任务:
请求取消任务。
这会安排在事件循环的下一个周期将 CancelledError 异常抛出到包装的协程中。
您有几种选择来处理这个问题:
1.使用asyncio.gather
如果 return_exceptions 为 True,则异常将被视为与成功结果相同,并聚合在结果列表中。
await queue.join()
for c in consumers:
c.cancel()
await asyncio.gather(*consumers, return_exceptions=True)
Run Code Online (Sandbox Code Playgroud)
2. 捕获consumer中的异常
async def consumer(q):
while True:
num = await q.get()
try:
print(f"Working on: {num}")
except asyncio.CancelledError:
print(f"Exiting...")
break
finally:
q.task_done()
Run Code Online (Sandbox Code Playgroud)
3. 抑制异常
from contextlib import suppress
async def consumer(q):
with suppress(asyncio.CancelledError):
while True:
num = await q.get()
print(f"Working on: {num}")
q.task_done()
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
3925 次 |
最近记录: |