Python asyncio 取消未等待的协程

Mr.*_*tan 3 python python-asyncio

因此,考虑到一些复杂的设置,它用于生成要半并行运行的查询列表(使用信号量不要同时运行太多查询,以免对服务器进行 DDoS)。

我有一个(本身是异步的)函数,它创建许多查询:

async def run_query(self, url):
    async with self.semaphore:
        return await some_http_lib(url)

async def create_queries(self, base_url):
   # ...gathering logic is ofc a bit more complex in the real setting
   urls = await some_http_lib(base_url).json()
 
   coros = [self.run_query(url) for url in urls]  # note: not executed just yet
   return coros

async def execute_queries(self):
   queries = await self.create_queries('/seom/url')
   _logger.info(f'prepared {len(queries)} queries')
   results = []
   done = 0

   # note: ofc, in this simple example call these would not actually be asynchronously executed.
   # in the real case i'm using asyncio.gather, this just makes for a slightly better
   # understandable example.
   for query in queries:
       # at this point, the request is actually triggered
       result = await query

       # ...some postprocessing

       if not result['success']:
           raise QueryException(result['message'])  # ...internal exception
       
       done  += 1
       _logger.info(f'{done} of {len(queries)} queries done')
       results.append(result)
    
   return results
Run Code Online (Sandbox Code Playgroud)

现在这工作得非常好,完全按照我的计划执行,并且我可以通过中止整个操作来处​​理其中一个查询中的异常。

async def run():
    try:
       return await QueryRunner.execute_queries()
    except QueryException:
       _logger.error('something went horribly wrong')
       return None
Run Code Online (Sandbox Code Playgroud)

唯一的问题是程序被终止,但留给我的是通常的RuntimeWarning: coroutine QueryRunner.run_query was never awaited,因为队列中后面的查询(正确地)没有执行,因此没有等待。

有什么办法可以取消这些未等待的协程吗?是否可以通过其他方式抑制此警告?

[编辑]关于如何在这个简单示例之外执行查询的更多上下文:查询通常分组在一起,因此使用不同的参数多次调用 create_queries() 。然后循环所有收集的组并使用 asyncio.gather(group) 执行查询。这会等待一组的所有查询,但如果其中一个查询失败,其他组也会被取消,从而导致抛出错误。

use*_*342 5

因此,您询问如何取消尚未等待或传递给 的协程gather。有两种选择:

  • 你可以打电话asyncio.create_task(c).cancel()
  • 您可以直接调用c.close()程对象

第一个选项有点重量级(它创建一个任务只是为了立即取消它),但它使用记录的 asyncio 功能。第二种选择更轻量级,但也更底层。

上述内容适用于从未转换为任务的协程对象(例如,通过将它们传递给gatheror )。wait如果他们有,例如,如果您调用asyncio.gather(*coros),其中一个提出,并且您想取消其余的,您应该更改代码以首先使用 将它们转换为任务asyncio.create_task(),然后调用gather,并使用finally取消未完成的任务:

tasks = list(map(asyncio.create_task, coros))
try:
    results = await asyncio.gather(*tasks)
finally:
    # if there are unfinished tasks, that is because one of them
    # raised - cancel the rest
    for t in tasks:
        if not t.done():
            t.cancel()
Run Code Online (Sandbox Code Playgroud)