实现异步迭代器

jon*_*rpe 9 python asynchronous python-3.x python-asyncio

根据PEP-492我试图实现一个异步迭代器,这样我就可以做到

async for foo in bar:
    ...
Run Code Online (Sandbox Code Playgroud)

这是一个简单的例子,类似于文档中的例子,对实例化和异步迭代进行了非常基本的测试:

import pytest

class TestImplementation:
    def __aiter__(self):
        return self
    async def __anext__(self):
        raise StopAsyncIteration


@pytest.mark.asyncio  # note use of pytest-asyncio marker
async def test_async_for():
    async for _ in TestImplementation():
        pass
Run Code Online (Sandbox Code Playgroud)

但是,当我执行我的测试套件时,我看到:

=================================== FAILURES ===================================
________________________________ test_async_for ________________________________

    @pytest.mark.asyncio
    async def test_async_for():
>       async for _ in TestImplementation():
E       TypeError: 'async for' received an invalid object from __aiter__: TestImplementation

...: TypeError
===================== 1 failed, ... passed in 2.89 seconds ======================
Run Code Online (Sandbox Code Playgroud)

为什么我TestImplementation看起来无效?据我所知,它符合协议:

  1. 对象必须实现一个__aiter__方法...返回一个异步迭代器对象.
  2. 异步迭代器对象必须实现一个__anext__方法...返回一个等待的.
  3. 要停止迭代__anext__必须引发StopAsyncIteration异常.

这是最新发布的Python(3.5.1),py.test(2.9.2)和pytest-asyncio(0.4.1)版本的失败.

jon*_*rpe 28

如果你在文档中进一步阅读,它会提到(强调我的):

PEP 492在CPython 3.5.0中被接受并__aiter__定义为一种方法,预计会将等待的解析返回到异步迭代器.

在3.5.2中(由于PEP 492是临时接受的), __aiter__协议已更新为直接返回异步迭代器.

因此,对于3.5.2之前的版本(2016/6/27发布),文档与如何编写有效的异步迭代器略有不同.3.5.0和3.5.1的固定版本如下所示:

class TestImplementation:
    async def __aiter__(self):
  # ^ note
        return self
    async def __anext__(self):
        raise StopAsyncIteration
Run Code Online (Sandbox Code Playgroud)

这是在关闭错误#27243时引入的,并且在数据模型文档中稍微清楚一点,这也提出了一种编写向后兼容代码的方法.

  • @Padraic [下次jon问一个问题,远离它](https://www.youtube.com/watch?v=4F4qzPbcFiA);) (7认同)

Kev*_*inG 5

异步迭代器已在 Python 3.6 中实现 - 请参阅PEP-525

那么你根本不需要 TestImplementation 来使用async for. 您可以使用yield(来自 PEP-525 的示例):

async def ticker(delay, to):
    """Yield numbers from 0 to `to` every `delay` seconds."""
    for i in range(to):
        yield i
        await asyncio.sleep(delay)
Run Code Online (Sandbox Code Playgroud)

async for然后您可以按照您的预期使用:

async for i in ticker(1, 10):                                                                     
    print(f'Tick #{i}')
Run Code Online (Sandbox Code Playgroud)

  • 这是一个生成器,与迭代器不同。并非所有迭代器都可以重构为生成器,也不应该如此。OP 问题的重点是创建一个迭代器,而不是测试“async for”循环。 (6认同)