ami*_*che 5 python-3.x python-asyncio
我认为在代码方面更容易理解:
async for item in timeout(something(), timeout=60, sentinel=None):
if item is not None:
await do_something_useful(item)
await refresh()
Run Code Online (Sandbox Code Playgroud)
我希望协程refresh至少每60秒执行一次。
我需要做这样的事情来创建一个 websocket(也是一个异步迭代器),如果它在一段时间后没有收到消息就会超时。我决定了以下几点:
socket_iter = socket.__aiter__()
try:
while True:
message = await asyncio.wait_for(
socket_iter.__anext__(),
timeout=10
)
except asyncio.futures.TimeoutError:
# streaming is completed
pass
Run Code Online (Sandbox Code Playgroud)
AsyncTimedIterable可能是您的代码中的实现timeout():
class _AsyncTimedIterator:
__slots__ = ('_iterator', '_timeout', '_sentinel')
def __init__(self, iterable, timeout, sentinel):
self._iterator = iterable.__aiter__()
self._timeout = timeout
self._sentinel = sentinel
async def __anext__(self):
try:
return await asyncio.wait_for(self._iterator.__anext__(), self._timeout)
except asyncio.TimeoutError:
return self._sentinel
class AsyncTimedIterable:
__slots__ = ('_factory', )
def __init__(self, iterable, timeout=None, sentinel=None):
self._factory = lambda: _AsyncTimedIterator(iterable, timeout, sentinel)
def __aiter__(self):
return self._factory()
Run Code Online (Sandbox Code Playgroud)
(原答案)
或者使用此类来替换您的timeout()函数:
class AsyncTimedIterable:
def __init__(self, iterable, timeout=None, sentinel=None):
class AsyncTimedIterator:
def __init__(self):
self._iterator = iterable.__aiter__()
async def __anext__(self):
try:
return await asyncio.wait_for(self._iterator.__anext__(),
timeout)
except asyncio.TimeoutError:
return sentinel
self._factory = AsyncTimedIterator
def __aiter__(self):
return self._factory()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1010 次 |
| 最近记录: |