Mr.*_*tan 3 python python-asyncio
因此,考虑到一些复杂的设置,它用于生成要半并行运行的查询列表(使用信号量不要同时运行太多查询,以免对服务器进行 DDoS)。
我有一个(本身是异步的)函数,它创建许多查询:
async def run_query(self, url):
async with self.semaphore:
return await some_http_lib(url)
async def create_queries(self, base_url):
# ...gathering logic is ofc a bit more complex in the real setting
urls = await some_http_lib(base_url).json()
coros = [self.run_query(url) for url in urls] # note: not executed just yet
return coros
async def execute_queries(self):
queries = await self.create_queries('/seom/url')
_logger.info(f'prepared {len(queries)} queries')
results = []
done = 0
# note: ofc, in this simple example call these would not actually be asynchronously executed.
# in the real case i'm using asyncio.gather, this just makes for a slightly better
# understandable example.
for query in queries:
# at this point, the request is actually triggered
result = await query
# ...some postprocessing
if not result['success']:
raise QueryException(result['message']) # ...internal exception
done += 1
_logger.info(f'{done} of {len(queries)} queries done')
results.append(result)
return results
Run Code Online (Sandbox Code Playgroud)
现在这工作得非常好,完全按照我的计划执行,并且我可以通过中止整个操作来处理其中一个查询中的异常。
async def run():
try:
return await QueryRunner.execute_queries()
except QueryException:
_logger.error('something went horribly wrong')
return None
Run Code Online (Sandbox Code Playgroud)
唯一的问题是程序被终止,但留给我的是通常的RuntimeWarning: coroutine QueryRunner.run_query was never awaited,因为队列中后面的查询(正确地)没有执行,因此没有等待。
有什么办法可以取消这些未等待的协程吗?是否可以通过其他方式抑制此警告?
[编辑]关于如何在这个简单示例之外执行查询的更多上下文:查询通常分组在一起,因此使用不同的参数多次调用 create_queries() 。然后循环所有收集的组并使用 asyncio.gather(group) 执行查询。这会等待一组的所有查询,但如果其中一个查询失败,其他组也会被取消,从而导致抛出错误。
因此,您询问如何取消尚未等待或传递给 的协程gather。有两种选择:
asyncio.create_task(c).cancel()c.close()协程对象第一个选项有点重量级(它创建一个任务只是为了立即取消它),但它使用记录的 asyncio 功能。第二种选择更轻量级,但也更底层。
上述内容适用于从未转换为任务的协程对象(例如,通过将它们传递给gatheror )。wait如果他们有,例如,如果您调用asyncio.gather(*coros),其中一个提出,并且您想取消其余的,您应该更改代码以首先使用 将它们转换为任务asyncio.create_task(),然后调用gather,并使用finally取消未完成的任务:
tasks = list(map(asyncio.create_task, coros))
try:
results = await asyncio.gather(*tasks)
finally:
# if there are unfinished tasks, that is because one of them
# raised - cancel the rest
for t in tasks:
if not t.done():
t.cancel()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2033 次 |
| 最近记录: |