基于生成器的协同程序看似无限递归

Jim*_*ard 11 python recursion generator coroutine python-3.x

以下内容摘自David Beazley关于发电机的幻灯片(这里有感兴趣的人).

Task定义了一个类,它包含一个生成期货的生成器,Task类,完整(没有错误处理),如下:

class Task:
    def __init__(self, gen):
        self._gen = gen

    def step(self, value=None):
        try:
            fut = self._gen.send(value)
            fut.add_done_callback(self._wakeup)
        except StopIteration as exc:
            pass

    def _wakeup(self, fut):
        result = fut.result()
        self.step(result)
Run Code Online (Sandbox Code Playgroud)

在一个示例中,还定义了以下递归函数:

from concurrent.futures import ThreadPoolExecutor
import time

pool = ThreadPoolExecutor(max_workers=8)

def recursive(n):
   yield pool.submit(time.sleep, 0.001)
   print("Tick :", n)
   Task(recursive(n+1)).step()
Run Code Online (Sandbox Code Playgroud)

以下两个案例:

  1. 从Python REPL中,如果我们定义它们(或者如果我们将它们放在一个文件中则导入它们),然后使用以下命令跳转启动递归:

    Task(recursive(0)).step()
    
    Run Code Online (Sandbox Code Playgroud)

    它开始打印,似乎已超过递归限制.它显然不会超过它,打印堆栈级别表明它在整个执行过程中保持不变.还有其他事情我不太明白.

    注意:如果你这样执行它,你需要杀死python进程.

  2. 如果我们将所有内容(Task,recursive)放在一个文件中:

    if __name__ == "__main__":
        Task(recursive(0)).step()
    
    Run Code Online (Sandbox Code Playgroud)

    然后运行它python myfile.py,它停止滴答7(max_workers似乎)的数量.


我的问题是它是如何看似超过递归限制的,为什么它根据你执行的方式表现不同?

这个行为出现在Python 3.6.2和Python 3.5.4上(我猜其他人也是3.63.5家人一样).

Blc*_*ght 11

recursive您显示的生成器实际上不会以导致系统递归限制问题的方式递归.

要了解为什么需要注意recursive生成器代码何时运行.与普通函数不同,只是调用recursive(0)不会导致它立即运行其代码并进行额外的递归调用.相反,调用recursive(0)立即返回一个生成器对象.只有当你send()到发生器时代码才会运行,并且只有在你send()第二次运行它之后它才会启动另一个调用.

让我们在代码运行时检查调用堆栈.在顶层,我们运行Task(recursive(0)).step().这按顺序完成了三件事:

  1. recursive(0) 此调用立即返回生成器对象.
  2. Task(_)Task对象被创建,并且其__init__方法存储在第一步骤中创建的生成器对象的引用.
  3. _.step()调用任务的方法.这是行动真正开始的地方!让我们看一下调用中发生的事情:

    • fut = self._gen.send(value)在这里,我们实际上通过向它发送值来启动生成器运行.让我们更深入地看看生成器代码运行:
      • yield pool.submit(time.sleep, 0.001)这计划在另一个线程中完成某些事情.我们不等待它发生.相反,我们得到一个Future我们可以用来在完成时得到通知的.我们立即回到上一级代码.
    • fut.add_done_callback(self._wakeup)在这里,我们要求_wakeup()在未来准备就绪时调用我们的方法.这总是立即返回!
    • step方法现在结束.那是对的,我们已经完成了(目前)!这对于你的问题的第二部分很重要,我将在稍后讨论.
  4. 我们的调用结束了,所以如果我们以交互方式运行,控制流将返回到REPL.如果我们作为脚本运行,则解释器将到达脚本的末尾并开始关闭(我将在下面更详细地讨论).但是,线程池控制的其他线程仍然在运行,并且在某些时候,其中一个线程将执行我们关心的一些事情!让我们看看那是什么.

  5. 当调度的function(time.sleep)运行完毕后,它运行的线程将调用我们在Future对象上设置的回调.也就是说,它将调用我们之前创建Task._wakup()Task对象(我们在顶层不再Future引用它,但保留了引用,因此它仍然存在).我们来看看方法:

    • result = fut.result()存储延期呼叫的结果.在这种情况下,这是无关紧要的,因为我们从不查看结果(None无论如何).
    • self.step(result)再来一次!现在我们回到我们关心的代码.让我们看看它这次做了什么:
      • fut = self._gen.send(value)再次发送到发电机,所以它接管.它已经产生了一次,所以这次我们刚开始yield:
        • print("Tick :", n) 这很简单.
        • Task(recursive(n+1)).step()这是事情变得有趣的地方.这条线就像我们开始的那样.因此,像以前一样,这将运行上面列出的逻辑1-4(包括它们的子步骤).但是,当step()方法返回时,它不会返回到REPL或结束脚本,而是返回到此处.
        • recursive()发电机(原单,不是我们刚创建的新的)已走到了尽头.因此,就像任何到达其代码末尾的生成器一样,它会提升StopIteration.
      • StopIteration被捕获并通过忽略try/ except块,并且step()方法结束.
    • _wakup()方法也结束,因此回调完成.
  6. 最终,Task也会调用先前回调中创建的回调.所以我们回过头来,一遍又一遍地重复步骤5(如果我们以交互方式运行).

上面的调用堆栈解释了为什么交互式案例永远打印.主线程返回到REPL(如果你可以看到其他线程的输出,你可以用它来做其他事情).但是在池中,每个线程从其自己的作业的回调中调度另一个作业.当下一个作业完成时,其回调计划另一个作业,依此类推.

那么,当您将代码作为脚本运行时,为什么只能获得8个打印输出?答案在上面的步骤4中暗示.当以非交互方式运行时,主线程在第一次调用return之后运行在脚本的末尾Task.step.这会提示解释器尝试关闭.

concurrent.futures.thread模块(其中ThreadPoolExecutor的定义)具有试图很好地清理当程序关闭而执行人仍然是活动的一些奇特的逻辑.它应该停止任何空闲线程,并在当前作业完成时发出任何仍在运行的信号.

该清理逻辑的确切实现以非常奇怪的方式与我们的代码交互(可能有也可能没有错误).结果是第一个线程不断给自己做更多的工作,而产生的额外工作线程在它们产生后立即退出.当执行程序启动了它想要使用的线程数时,第一个工作程序最终退出(在我们的例子中为8).

根据我的理解,这是事件的顺序.

  1. 我们导入(间接)concurrent.futures.thread模块,该模块atexit用于告诉解释器_python_exit在解释器关闭之前运行一个名为的函数.
  2. 我们创建一个ThreadPoolExecutor最大线程数为8.它不会立即生成其工作线程,但会在每次调度作业时创建一个,直到它全部为8.
  3. 我们安排了我们的第一份工作(在上一个清单的第3步深层嵌套部分).
  4. 执行程序将作业添加到其内部队列,然后通知它没有最大数量的工作线程并启动一个新线程.
  5. 新线程将作业从队列中弹出并开始运行它.但是,sleep调用比其他步骤花费的时间要长得多,所以线程会在这里停留一段时间.
  6. 主线程完成(它已到达上一个列表中的第4步).
  7. _python_exit函数由解释器调用,因为解释器想要关闭.该函数_shutdown在模块中设置一个全局变量,并将一个发送None到执行程序的内部队列(它None每个线程发送一个,但是到目前为止只创建了一个线程,所以它只发送一个None).然后它阻塞主线程,直到它知道的线程退出.这会延迟解释器的关闭.
  8. 工作线程的回调调用time.sleep.它调用在其作业中注册的回调函数,该函数Future调度另一个作业.
  9. 与此列表的第4步一样,执行程序将作业排队,并启动另一个线程,因为它还没有所需的数字.
  10. 新线程尝试从内部队列中获取作业,但从None步骤7 获取值,这是可以完成的信号.它看到_shutdown全局设置,因此它退出.但在此之前,它会None在队列中添加另一个.
  11. 第一个工作线程完成其回调.它查找一个新作业,并在步骤8中找到它自己排队的那个作业.它开始运行作业,就像在步骤5中,这需要一段时间.
  12. 但是没有其他事情发生,因为第一个工作者是当前唯一的活动线程(主线程被阻塞等待第一个工作者死亡,而另一个工作者自行关闭).
  13. 我们现在重复步骤8-12几次.第一个工作线程将第三个到第八个作业排队,执行程序每次都产生一个相应的线程,因为它没有完整的集合.但是,每个线程立即死亡,因为它None从作业队列中取出而不是完成实际作业.第一个工作线程最终完成所有实际工作.
  14. 最后,在第8个工作之后,某些工作方式不同.这次,当回调调度另一个作业时,不会产生额外的线程,因为执行程序知道它已经启动了所请求的8个线程(它不知道7已经关闭).
  15. 所以这一次,None内部作业队列的头部由第一个工作人员(而不是实际工作)获取.这意味着它会关闭,而不是做更多的工作.
  16. 当第一个工作程序关闭时,主线程(它一直在等待它退出)最终可以解除阻塞并且该_python_exit功能完成.这使得解释器可以完全关闭.我们完成了!

这解释了我们看到的输出!我们获得了8个输出,所有输出都来自同一个工作线程(第一个产生的线程).

我认为在该代码中可能存在竞争条件.如果步骤11发生在步骤10之前,事情可能会中断.如果第一个工人None离开队列而另一个新产生的工人得到了真正的工作,那么他们的交换角色(第一个工人会死,另一个人会完成其余的工作,除非更多的竞争条件这些步骤的后续版本).但是,一旦第一个工人死亡,主线程就会被解除阻塞.由于它不知道其他线程(因为当它使得它的线程列表等待时它们不存在),它将过早地关闭解释器.

我不确定这场比赛是否有可能发生.我猜这是不太可能的,因为新线程启动和从队列中获取作业之间的代码路径长度比现有线程完成回调的路径要短得多(排队之后的部分)新工作)然后在队列中寻找另一个工作.

我怀疑ThreadPoolExecutor当我们将代码作为脚本运行时,让我们干净地退出是一个错误._shutdown除了执行者自己的self._shutdown属性之外,排队新作业的逻辑应该检查全局标志.如果是这样,在主线程完成后尝试排队另一个作业会引发异常.

您可以通过ThreadPoolExecutorwith语句中创建我复制我认为更健全的行为:

# create the pool below the definition of recursive()
with ThreadPoolExecutor(max_workers=8) as pool:
    Task(recursive(0)).step()
Run Code Online (Sandbox Code Playgroud)

主线程从step()调用返回后很快就会崩溃.它看起来像这样:

exception calling callback for <Future at 0x22313bd2a20 state=finished returned NoneType>
Traceback (most recent call last):
  File "S:\python36\lib\concurrent\futures\_base.py", line 324, in _invoke_callbacks
    callback(self)
  File ".\task_coroutines.py", line 21, in _wakeup
    self.step(result)
  File ".\task_coroutines.py", line 14, in step
    fut = self._gen.send(value)
  File ".\task_coroutines.py", line 30, in recursive
    Task(recursive(n+1)).step()
  File ".\task_coroutines.py", line 14, in step
    fut = self._gen.send(value)
  File ".\task_coroutines.py", line 28, in recursive
    yield pool.submit(time.sleep, 1)
  File "S:\python36\lib\concurrent\futures\thread.py", line 117, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
Run Code Online (Sandbox Code Playgroud)