Ale*_*nov 29 python iterator coroutine async-await
想象一下,我们有一个迭代器,比方说iter(range(1, 1000)).我们有两种功能,每接受一个迭代器作为唯一的参数,说sum()和max().在SQL世界中,我们将其称为聚合函数.
有没有办法在不缓冲迭代器输出的情况下获得两者的结果?
要做到这一点,我们需要暂停和恢复聚合函数执行,以便为它们提供相同的值而不存储它们.也许是否有一种方法可以在没有睡眠的情况下使用异步事物来表达它?
use*_*342 42
让我们考虑如何将两个聚合函数应用于同一个迭代器,我们只能耗尽一次.最初的尝试(硬编码sum和max简洁,但可以简单地推广到任意数量的聚合函数)可能如下所示:
def max_and_sum_buffer(it):
content = list(it)
p = sum(content)
m = max(content)
return p, m
Run Code Online (Sandbox Code Playgroud)
这种实现的缺点是它将所有生成的元素一次存储在内存中,尽管这两个函数完全能够进行流处理.这个问题预计会出现这个问题,并明确要求在不缓冲迭代器输出的情况下生成结果.是否有可能做到这一点?
这当然看起来可能.毕竟,Python迭代器是外部的,因此每个迭代器都能够自行挂起.提供一个将迭代器拆分为两个提供相同内容的新迭代器的适配器有多难?实际上,这正是描述itertools.tee,它看起来非常适合并行迭代:
def max_and_sum_tee(it):
it1, it2 = itertools.tee(it)
p = sum(it1) # XXX
m = max(it2)
return p, m
Run Code Online (Sandbox Code Playgroud)
上面会产生正确的结果,但不会像我们希望的那样工作.麻烦的是我们没有并行迭代.聚合函数like sum和maxnever suspend - 每个函数都坚持在生成结果之前使用所有迭代器内容.因此,在有机会运行之前sum会耗尽.单独留下的耗尽元素将导致这些元素累积在两个迭代器之间共享的内部FIFO中.这在这里是不可避免的 - 因为必须看到相同的元素,别无选择,只能积累它们.(有关更多有趣的详细信息,请参阅此帖.)it1maxit1it2max(it2)teetee
换句话说,该实现与第一个实现之间没有区别,除了第一个实现至少使缓冲显式化.为了消除缓冲,sum并且max必须并行运行,而不是一前一后.
让我们看看如果我们在单独的线程中运行聚合函数会发生什么,仍然使用tee复制原始迭代器:
def max_and_sum_threads_simple(it):
it1, it2 = itertools.tee(it)
with concurrent.futures.ThreadPoolExecutor(2) as executor:
sum_future = executor.submit(lambda: sum(it1))
max_future = executor.submit(lambda: max(it2))
return sum_future.result(), max_future.result()
Run Code Online (Sandbox Code Playgroud)
现在sum并且max实际上并行运行(与GIL允许的一样多),线程由优秀的concurrent.futures模块管理.它有一个致命的缺陷,但是:对于tee没有对数据进行缓冲,sum并max必须以完全相同的速率来处理自己的物品.如果一个比另一个快一点,它们将分开,tee并将缓冲所有中间元素.由于无法预测每个运行的速度有多快,因此缓冲量无法预测,并且具有缓冲所有内容的最坏情况.
为确保不发生缓冲,tee必须使用自定义生成器替换,该生成器不会缓冲任何内容并阻塞,直到所有使用者都观察到之前的值,然后再继续执行下一个值.和以前一样,每个使用者都在自己的线程中运行,但是现在调用线程正在忙于运行生成器,这个循环实际上遍历源迭代器并发出新值可用的信号.这是一个实现:
def max_and_sum_threads(it):
STOP = object()
next_val = None
consumed = threading.Barrier(2 + 1) # 2 consumers + 1 producer
val_id = 0
got_val = threading.Condition()
def send(val):
nonlocal next_val, val_id
consumed.wait()
with got_val:
next_val = val
val_id += 1
got_val.notify_all()
def produce():
for elem in it:
send(elem)
send(STOP)
def consume():
last_val_id = -1
while True:
consumed.wait()
with got_val:
got_val.wait_for(lambda: val_id != last_val_id)
if next_val is STOP:
return
yield next_val
last_val_id = val_id
with concurrent.futures.ThreadPoolExecutor(2) as executor:
sum_future = executor.submit(lambda: sum(consume()))
max_future = executor.submit(lambda: max(consume()))
produce()
return sum_future.result(), max_future.result()
Run Code Online (Sandbox Code Playgroud)
对于概念上如此简单的事情,这是相当多的代码,但它对于正确的操作是必要的.
produce()循环遍历外部迭代器并将项目一次发送给使用者一个值.它使用了一个屏障,一个在Python 3.2中添加的方便的同步原语,等待所有消费者完成旧值,然后用新的值覆盖它next_val.一旦新值准备就绪,就会广播一个条件.consume()是一个发生器,它在到达时发送产生的值,直到检测到STOP.通过在循环中创建使用者并在创建屏障时调整其编号,可以通过并行运行任意数量的聚合函数来泛化代码.
这个实现的缺点是它需要创建线程(可能通过使线程池全局化来缓解)以及在每次迭代传递时进行大量非常仔细的同步.这种同步破坏了性能 - 这个版本比单线程慢了近2000倍tee,比简单但非确定性的线程版本慢了475倍.
但是,只要使用线程,就不会以某种形式避免同步.为了完全消除同步,我们必须放弃线程并切换到协作式多任务.现在的问题是是否有可能暂停像普通的同步功能的执行sum,并max以它们之间进行切换?
事实证明,greenlet第三方扩展模块可以实现这一点.Greenlets是一种纤维的实现,轻量级的微线程可以明确地在彼此之间切换.这有点像Python生成器,yield用于挂起,除了greenlets提供了更灵活的挂起机制,允许人们选择挂起的人.
这使得将线程版本移植max_and_sum到greenlets 相当容易:
def max_and_sum_greenlet(it):
STOP = object()
consumers = None
def send(val):
for g in consumers:
g.switch(val)
def produce():
for elem in it:
send(elem)
send(STOP)
def consume():
g_produce = greenlet.getcurrent().parent
while True:
val = g_produce.switch()
if val is STOP:
return
yield val
sum_result = []
max_result = []
gsum = greenlet.greenlet(lambda: sum_result.append(sum(consume())))
gsum.switch()
gmax = greenlet.greenlet(lambda: max_result.append(max(consume())))
gmax.switch()
consumers = (gsum, gmax)
produce()
return sum_result[0], max_result[0]
Run Code Online (Sandbox Code Playgroud)
逻辑是相同的,但代码较少.和以前一样,produce生成从源迭代器检索的值,但是它send不需要同步,因为当所有内容都是单线程时它不需要.相反,它明确地切换到每个消费者来做它的事情,消费者尽职尽责地转回去.在经历了所有消费者之后,生产者已准备好进行下一次迭代过程.
使用中间单元素列表检索结果,因为greenlet不提供对目标函数的返回值的访问(也没有threading.Thread,这就是我们选择concurrent.futures上面的原因).
但是,使用greenlets有一些缺点.首先,它们没有附带标准库,您需要安装greenlet扩展.然后,greenlet本质上是不可移植的,因为操作系统和编译器不支持堆栈切换代码,并且可以被认为是一种破解(尽管非常聪明).针对WebAssembly或JVM或GraalVM的 Python 不太可能支持greenlet.这不是一个紧迫的问题,但从长远来看,这绝对是值得注意的事情.
从Python 3.5开始,Python提供了本机协同程序.与greenlet不同,类似于生成器,协同程序与常规函数不同,必须使用async def.协同程序不能轻易地从同步代码执行,它们必须由调度程序处理,调度程序驱动它们完成.调度程序也称为事件循环,因为它的另一个任务是接收IO事件并将它们传递给适当的回调和协同程序.在标准库中,这是asyncio模块的作用.
在实现基于asyncio之前max_and_sum,我们必须首先解决障碍.与greenlet不同,asyncio只能暂停协程的执行,而不能执行任意函数.所以我们需要替换sum和max基本相同的协同程序.这就像以显而易见的方式实现它们一样简单,只需替换for为async for,使异步迭代器在等待下一个值到达时挂起协同程序:
async def asum(it):
s = 0
async for elem in it:
s += elem
return s
async def amax(it):
NONE_YET = object()
largest = NONE_YET
async for elem in it:
if largest is NONE_YET or elem > largest:
largest = elem
if largest is NONE_YET:
raise ValueError("amax() arg is an empty sequence")
return largest
# or, using https://github.com/vxgmichel/aiostream
#
#from aiostream.stream import accumulate
#def asum(it):
# return accumulate(it, initializer=0)
#def amax(it):
# return accumulate(it, max)
Run Code Online (Sandbox Code Playgroud)
人们可以合理地询问提供一对新的聚合函数是否在作弊; 毕竟,以前的解决方案都小心使用现有sum和max内置插件.答案将取决于问题的确切解释,但我认为新功能是允许的,因为它们绝不是特定于手头的任务.它们与内置函数完全相同,但使用异步迭代器.我怀疑这个函数在标准库中某处尚未存在的唯一原因是协同程序和异步迭代器是一个相对较新的特性.
有了这个,我们可以继续写作max_and_sum协程:
async def max_and_sum_asyncio(it):
loop = asyncio.get_event_loop()
STOP = object()
next_val = loop.create_future()
consumed = loop.create_future()
used_cnt = 2 # number of consumers
async def produce():
for elem in it:
next_val.set_result(elem)
await consumed
next_val.set_result(STOP)
async def consume():
nonlocal next_val, consumed, used_cnt
while True:
val = await next_val
if val is STOP:
return
yield val
used_cnt -= 1
if not used_cnt:
consumed.set_result(None)
consumed = loop.create_future()
next_val = loop.create_future()
used_cnt = 2
else:
await consumed
s, m, _ = await asyncio.gather(asum(consume()), amax(consume()),
produce())
return s, m
Run Code Online (Sandbox Code Playgroud)
虽然这个版本是基于单个线程内的协同程序之间的切换,就像使用greenlet的那个,它看起来不同.asyncio不提供协同程序的显式切换,它基于await暂停/恢复原语的任务切换.目标await可以是另一个协程,但也是一个抽象的"未来",一个值占位符,稍后将被其他协同程序填充.一旦等待的值变得可用,事件循环将自动恢复协同程序的执行,await表达式将评估为提供的值.因此produce,它不是转向消费者,而是等待所有消费者观察到产出价值后即将到来的未来.
consume()是一个异步生成器,它像一个普通的生成器,除了它创建一个异步迭代器,我们的聚合协程已经准备好接受使用async for.异步迭代器的等价物__next__被调用__anext__并且是一个协程,允许在等待新值到达时耗尽异步迭代器的协程暂停.当正在运行的异步生成器挂起时await,可以通过async for作为隐式__anext__调用的暂停来观察.consume()当它等待提供的值时,确切地说,当produce它们变得可用时,它们将它们传输到聚合协程,例如asum和amax.等待是通过next_val未来来实现的,它带有下一个元素it.等待未来内部consume()暂停异步生成器,并与它一起使用聚合协程.
与greenlet的显式切换相比,这种方法的优点在于,它可以更容易地将彼此不了解的协同程序组合到同一个事件循环中.例如,可以有两个max_and_sum并行运行的实例(在同一个线程中),或者运行一个更复杂的聚合函数,该函数调用进一步的异步代码来进行计算.
以下便捷功能显示了如何从非asyncio代码运行上述内容:
def max_and_sum_asyncio_sync(it):
# trivially instantiate the coroutine and execute it in the
# default event loop
coro = max_and_sum_asyncio(it)
return asyncio.get_event_loop().run_until_complete(coro)
Run Code Online (Sandbox Code Playgroud)
测量和比较这些方法的性能,并行执行可能会产生误导,因为sum和max几乎不用做任何处理,从而过度强调并行的开销.对待这些,就像处理任何含有大量盐的微型基准一样.话虽如此,让我们看看数字吧!
测量是使用Python 3.6生成的.函数只运行一次并给出range(10000),它们的时间通过time.time()在执行之前和之后减去来测量.结果如下:
max_and_sum_buffer和max_and_sum_tee:0.66毫秒 - 两者几乎完全相同,tee版本更快一点.
max_and_sum_threads_simple:2.7毫秒.由于非确定性缓冲,此时序意味着很少,因此这可能是测量启动两个线程的时间以及Python内部执行的同步.
max_and_sum_threads:1.29 秒,迄今为止最慢的选项,比最快的选项慢〜2000倍.这种可怕的结果可能是由在迭代的每个步骤执行的多个同步的组合以及它们与GIL的交互引起的.
max_and_sum_greenlet:25.5 ms,与初始版本相比较慢,但比线程版本快得多.使用足够复杂的聚合函数,可以想象在生产中使用此版本.
max_and_sum_asyncio:351毫秒,几乎比greenlet版本慢14倍.这是一个令人失望的结果,因为asyncio协同程序比greenlet更轻量级,并且它们之间的切换应该比在光纤之间切换快得多.运行协同调度程序和事件循环(在这种情况下,如果代码没有IO,则过度杀戮)的开销可能会破坏此微基准测试的性能.
max_and_sum_asyncio使用uvloop:125毫秒.这比常规asyncio的速度快两倍,但仍然比greenlet慢近5倍.
在PyPy下运行示例并没有带来显着的加速,实际上大多数示例运行得稍慢,即使在运行它们几次以确保JIT预热之后.asyncio函数需要重写不要使用异步生成器(因为撰写本文时PyPy实现了Python 3.5),并且在不到100ms的时间内执行.这与CPython + uvloop性能相当,即更好,但与greenlet相比并不显着.
如果它适用于您的聚合函数f(a,b,c,...) == f(a, f(b, f(c, ...))),那么您可以循环使用函数并一次为它们提供一个元素,每次将它们与前一个应用程序的结果相结合,就像reduce这样做,例如:
def aggregate(iterator, *functions):
first = next(iterator)
result = [first] * len(functions)
for item in iterator:
for i, f in enumerate(functions):
result[i] = f((result[i], item))
return result
Run Code Online (Sandbox Code Playgroud)
这比仅仅在列表中实现迭代器并在列表中作为一个整体应用聚合函数或者使用itertools.tee(在内部基本上做同样的事情)要慢得多(大约10-20倍),但是它具有以下优点:不使用额外的内存.
但请注意,虽然这适用于类似的函数sum,min或者max它不适用于其他聚合函数,例如查找迭代器的均值或中值元素,如mean(a, b, c) != mean(a, mean(b, c)).(因为mean,你当然sum可以通过元素的数量得到它除以它,但是计算例如一次只取一个元素的中位数将更具挑战性.)