hua*_*ang 6 python python-asyncio
如下例所示,我在使用异步生成器时遇到了异常错误。
async def demo():
async def get_data():
for i in range(5): # loop: for or while
await asyncio.sleep(1) # some IO code
yield i
datas = get_data()
await asyncio.gather(
anext(datas),
anext(datas),
anext(datas),
anext(datas),
anext(datas),
)
if __name__ == '__main__':
# asyncio.run(main())
asyncio.run(demo())
Run Code Online (Sandbox Code Playgroud)
控制台输出:
2022-05-11 23:55:24,530 DEBUG asyncio 29180 30600 Using proactor: IocpProactor
Traceback (most recent call last):
File "E:\workspace\develop\python\crawlerstack-proxypool\demo.py", line 77, in <module>
asyncio.run(demo())
File "D:\devtools\Python310\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "D:\devtools\Python310\lib\asyncio\base_events.py", line 641, in run_until_complete
return future.result()
File "E:\workspace\develop\python\crawlerstack-proxypool\demo.py", line 66, in demo
await asyncio.gather(
RuntimeError: anext(): asynchronous generator is already running
Run Code Online (Sandbox Code Playgroud)
情况描述:我有一个循环逻辑,每次从Redis中获取一批数据,我想使用yield返回结果。但是当我创建并发任务时就会出现这个错误。
针对这种情况有没有好的解决办法呢?我并不是要改变我现在使用它的方式,而是想看看我是否可以判断它是否正在运行或类似锁之类的东西并等待它运行然后执行下一个。
也许我现在的逻辑不太合理,但我也想听懂一些批判性的语言,让我意识到这件事的严重性。
感谢您的帮助。
Ant*_*lov 11
异步生成器非常适合并行消耗。请参阅下面我的解释。作为正确的解决方法,用于asyncio.Queue生产者和消费者之间的通信:
queue = asyncio.Queue()
async def producer():
for item in range(5):
await asyncio.sleep(random.random()) # imitate async fetching
print('item fetched:', item)
await queue.put(item)
async def consumer():
while True:
item = await queue.get()
await asyncio.sleep(random.random()) # imitate async processing
print('item processed:', item)
await asyncio.gather(producer(), consumer(), consumer())
Run Code Online (Sandbox Code Playgroud)
上面的代码片段适用于无限的项目流:例如,一个 Web 服务器,它永远运行以服务来自客户端的请求。但是如果我们需要处理有限数量的项目怎么办?我们应该如何consumer知道何时停止?
这值得在 Stack Overflow 上提出另一个问题来涵盖所有替代方案,但最简单的选择是一种sentinel方法,如下所述。
介绍一个sentinel = object(). 当从外部数据源获取所有项目并将其放入队列时,producer必须将尽可能多的sentinel数据推送到队列中consumer。一旦 aconsumer获取了sentinel,它就知道应该停止:if item is sentinel: breakfrom 循环。
sentinel = object()
consumers_count = 2
async def producer():
... # the same code as above
if new_item is None: # if no new data
for _ in range(consumers_count):
await queue.put(sentinel)
async def consumer():
while True:
... # the same code as above
if item is sentinel:
break
await asyncio.gather(
producer(),
*(consumer() for _ in range(consumers_count)),
)
Run Code Online (Sandbox Code Playgroud)
由于您不需要更改异步生成器方法,因此这里有一个基于 asyncgen 的替代方案。要解决此问题(以简单但肮脏的方式),您可以使用锁包装源异步生成器:
async def with_lock(agen, lock: asyncio.Lock):
while True:
async with lock: # only one consumer is allowed to read
try:
item = await anext(agen)
except StopAsyncIteration:
break
# exiting lock section => let others consume
yield item # consumer processes an item asyncly
lock = asyncio.Lock() # a common lock for all consumers
await asyncio.gather(
# every consumer must have its own "wrapped" generator
anext(with_lock(datas, lock)),
anext(with_lock(datas, lock)),
...
)
Run Code Online (Sandbox Code Playgroud)
这将确保一次只有一个消费者等待来自生成器的一项。当该消费者等待时,其他消费者正在执行,因此并行性不会丢失。
重要的!在上述方法中,不要作为 yield await anext(agen)下的单个表达式lock:您的包装生成器将暂停 (on yield)lock 释放,并且其他消费者将无法并行使用另一个项目。即只anext用锁包装调用,但不要yield在锁部分。
大致等效的代码async for(看起来更聪明):
async def with_lock(agen, lock: asyncio.Lock):
await lock.acquire()
async for item in agen:
lock.release()
yield item
await lock.acquire()
lock.release()
Run Code Online (Sandbox Code Playgroud)
但是,此代码仅处理异步生成器anext方法。而生成器 API 还包括aclose和athrow方法。请参阅下面的解释。
尽管您with_lock也可以向函数添加对这些的支持,但我建议对生成器进行子类化并处理内部的锁支持,或者更好地使用Queue上面基于 - 的方法。
看contextlib.aclosing有没有一些灵感。
同步和异步生成器都有一个特殊的属性:(.gi_running对于常规生成器)和.ag_running(对于异步生成器)。dir您可以通过在生成器上执行来发现它们:
>>> dir((i for i in range(0))
[..., 'gi_running', ...]
Run Code Online (Sandbox Code Playgroud)
它们被设置为当执行True生成器的.__next__or方法时(.__anext__next(...)方法时(并且anext(...)只是这些的语法糖)。
next(...)当同一生成器上的另一个调用已经执行时,这可以防止在生成器上重新执行next(...):如果运行标志为True:如果运行标志为,则会引发异常(对于同步生成器,它会引发ValueError: generator already executing)。
因此,回到您的示例,当您运行await anext(datas)(通过asyncio.gather)时,会发生以下情况:
datas.ag_running被设定为True。datas.__anext__方法。await一旦到达方法内部的内部语句__anext__(await asyncio.sleep(1)在您的情况下),asyncio的循环就会切换到另一个使用者。await anext(datas),但由于datas.ag_running标志仍然设置为True,这会导致RuntimeError.生成器的执行可以暂停和恢复。但仅限于yield陈述。因此,如果发电机在内部暂停await,则它无法“恢复”,因为它的状态不允许这样做。
这就是为什么并行next/anext调用生成器会引发异常:它尚未准备好恢复,它已经在运行。
athrow和aclose生成器的 API(同步和异步)不仅包括send/asend迭代方法,还包括:
close/aclose在退出或异常时释放生成器分配的资源(例如数据库连接)throw/athrow通知生成器它必须处理异常。aclose也是athrow异步方法。这意味着如果两个消费者尝试并行关闭/抛出底层生成器,您将遇到相同的问题,因为生成器将在关闭(引发异常)时再次关闭(或处理异常)。
尽管这是异步生成器的常见情况,但为同步生成器复制它并不是那么幼稚,因为同步next(...)调用很少被中断。
中断同步生成器的方法之一是运行多线程代码,其中多个使用者(在并行线程中运行)从单个生成器读取数据。在这种情况下,当生成器的代码在执行调用时被中断时next,所有其他消费者的并行调用尝试next将导致异常。
实现这一点的另一种方法在与生成器相关的 PEP #255中通过自消耗生成器进行了演示:
>>> def g():
... i = next(me)
... yield i
...
>>> me = g()
>>> next(me)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 2, in g
ValueError: generator already executing
Run Code Online (Sandbox Code Playgroud)
当outernext(me)被调用时,它设置me.gi_running为True然后执行生成器函数代码。随后的内部next(me)调用会导致ValueError.
生成器(尤其是异步生成器)在由单个读取器使用时效果最佳。多个消费者的支持很困难,因为需要修补所有生成器方法的行为,因此不鼓励。
| 归档时间: |
|
| 查看次数: |
2312 次 |
| 最近记录: |