python3.6 新更改 'async for' 与 enumerate 不兼容吗

Tar*_*tor 5 python asynchronous mongodb python-asyncio python-3.6

我正在尝试将我的应用程序从 python2.7 移动到 python3.6,用于 asyncio 和相关库,但我发现某些功能无法正常工作,因为我除外。我使用 motor 从 mongodb 异步查询,例如:

async def do_query():
    song_already_processed = set()
    song_table = db.song_table
    async for index, item in enumerate(song_table.find({'lang': 'English'},
                              {'id': 1, '_id': 0, 'title': 1, 'artist.name': 1})):
        if index > 100:
            break
        if item['id'] in song_already_processed:
            continue
        song_already_processed.add(item['id'])

    print(len(song_already_processed))
Run Code Online (Sandbox Code Playgroud)

但它引发了错误:

TypeError: 'AsyncIOMotorCursor' object is not iterable
Run Code Online (Sandbox Code Playgroud)

我认为异步迭代器协议与普通的迭代器协议不同,所以枚举不能很好地工作。是否有我可以使用的异步枚举?

顺便说一句,我知道很多方法来获取 100 个文档并停止迭代,我只想知道如何正确使用“async for”

Mis*_*agi 5

asyncstdlib(免责声明:我维护这个包)提供async了标准库助手的变体。具体来说,它的asyncstdlib.enumerate工作原理类似于enumerate但需要并生成一个异步可迭代对象。

import asyncstdlib as a

async for index, item in a.enumerate(song_table.find(...)):
    if index > 100:
        break
    ...
Run Code Online (Sandbox Code Playgroud)

请注意,break退出异步迭代通常不是一个好主意——迭代器可能不会在迭代结束时进行清理(有关详细信息,请参阅PEP 533)。

由于您仅使用index获取前 100 个项目,因此您还可以asyncstdlib.islice直接使用来安全地限制迭代:

import asyncstdlib as a

async for item in a.islice(song_table.find(...), 100):
    ...
Run Code Online (Sandbox Code Playgroud)

Python3.6 添加了异步生成器,可以轻松实现异步枚举:

async def aenumerate(asequence, start=0):
    """Asynchronously enumerate an async iterator from a given start value"""
    n = start
    async for elem in asequence:
        yield n, elem
        n += 1
Run Code Online (Sandbox Code Playgroud)

对于旧版本的 Python,必须手动具体化异步生成器:

class AsyncEnumerate:
    """Asynchronously enumerate an async iterator from a given start value"""
    def __init__(self, asequence, start=0):
        self._asequence = asequence
        self._value = start

    async def __anext__(self):
        elem = await self._asequence.__anext__()
        value, self._value = self._value, self._value + 1
        return value, elem

    def __aiter__(self):
        return self
Run Code Online (Sandbox Code Playgroud)


Vin*_*ent 4

如果您不介意有外部依赖项,您可以使用aiostream.stream.enumerate

async for i, item in aiostream.stream.enumerate(cursor):
    ...
Run Code Online (Sandbox Code Playgroud)

请参阅此演示文档中的更多示例。