如何通过超时迭代异步迭代器?

ami*_*che 5 python-3.x python-asyncio

我认为在代码方面更容易理解:

async for item in timeout(something(), timeout=60, sentinel=None):
    if item is not None:
        await do_something_useful(item)
    await refresh()
Run Code Online (Sandbox Code Playgroud)

我希望协程refresh至少每60秒执行一次。

Adv*_*bly 5

我需要做这样的事情来创建一个 websocket(也是一个异步迭代器),如果它在一段时间后没有收到消息就会超时。我决定了以下几点:

socket_iter = socket.__aiter__()
try:
    while True:
        message = await asyncio.wait_for(
            socket_iter.__anext__(),
            timeout=10
        )
except asyncio.futures.TimeoutError:
    # streaming is completed
    pass
Run Code Online (Sandbox Code Playgroud)


Fan*_*ing 3

AsyncTimedIterable可能是您的代码中的实现timeout()

class _AsyncTimedIterator:

    __slots__ = ('_iterator', '_timeout', '_sentinel')

    def __init__(self, iterable, timeout, sentinel):
        self._iterator = iterable.__aiter__()
        self._timeout = timeout
        self._sentinel = sentinel

    async def __anext__(self):
        try:
            return await asyncio.wait_for(self._iterator.__anext__(), self._timeout)
        except asyncio.TimeoutError:
            return self._sentinel


class AsyncTimedIterable:

    __slots__ = ('_factory', )

    def __init__(self, iterable, timeout=None, sentinel=None):
        self._factory = lambda: _AsyncTimedIterator(iterable, timeout, sentinel)

    def __aiter__(self):
        return self._factory()
Run Code Online (Sandbox Code Playgroud)

(原答案)

或者使用此类来替换您的timeout()函数:

class AsyncTimedIterable:
    def __init__(self, iterable, timeout=None, sentinel=None):
        class AsyncTimedIterator:
            def __init__(self):
                self._iterator = iterable.__aiter__()

            async def __anext__(self):
                try:
                    return await asyncio.wait_for(self._iterator.__anext__(),
                                                  timeout)
                except asyncio.TimeoutError:
                    return sentinel

        self._factory = AsyncTimedIterator

    def __aiter__(self):
        return self._factory()
Run Code Online (Sandbox Code Playgroud)