asyncio 任务已被销毁但正在挂起

spy*_*der 5 python asynchronous event-loop coroutine python-asyncio

我正在编写一个示例程序,它从数据源(csv 或 rdbms)中读取数据块,进行一些转换并通过套接字将其发送到服务器。

但因为 csv 非常大,出于测试目的,我想在几个块之后中断读取。不幸的是,出了问题,我不知道是什么以及如何解决它。也许我必须取消一些活动,但现在确定在哪里以及如何取消。我收到以下错误:

Task was destroyed but it is pending!
task: <Task pending coro=<<async_generator_athrow without __name__>()>>
Run Code Online (Sandbox Code Playgroud)

示例代码是:

import asyncio
import json

async def readChunks():
  # this is basically a dummy alternative for reading csv in chunks
  df = [{"chunk_" + str(x) : [r for r in range(10)]} for x in range(10)]
  for chunk in df:
    await asyncio.sleep(0.001)
    yield chunk

async def send(row):
    j = json.dumps(row)
    print(f"to be sent: {j}")
    await asyncio.sleep(0.001)


async def main():
    i = 0
    async for chunk in readChunks():
        for k, v in chunk.items():
            await asyncio.gather(send({k:v}))
        i += 1
        if i > 5:
            break
        #print(f"item in main via async generator is {chunk}")
    

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Run Code Online (Sandbox Code Playgroud)

Mis*_*agi 6

许多async资源(例如生成器)需要借助事件循环进行清理。当async for循环通过 停止迭代异步生成器时break,生成器仅由垃圾收集器清理。这意味着任务正在挂起(等待事件循环)但被销毁(被垃圾收集器)。

最直接的修复是aclose显式地修改生成器:

async def main():
    i = 0
    aiter = readChunks()      # name iterator in order to ...
    try:
        async for chunk in aiter:
            ...
            i += 1
            if i > 5:
                break
    finally:
        await aiter.aclose()  # ... clean it up when done
Run Code Online (Sandbox Code Playgroud)

这些模式可以使用asyncstdlib(免责声明:我维护这个库)来简化。asyncstdlib.islice允许在完全关闭生成器之前获取固定数量的物品:

import asyncstdlib as a

async def main():
    async for chunk in a.islice(readChunks(), 5):
        ...
Run Code Online (Sandbox Code Playgroud)

如果break条件是动态的,则确定迭代器的范围可以保证在任何情况下都进行清理:

import asyncstdlib as a

async def main():
    async with a.scoped_iter(readChunks()) as aiter:
        async for idx, chunk in a.enumerate(aiter):
            ...
            if idx >= 5:
                break
Run Code Online (Sandbox Code Playgroud)