带有上下文管理器的 ThreadPoolExecutor“关闭后无法安排新的期货”

sad*_*ave 6 python-multithreading python-3.x threadpoolexecutor concurrent.futures python-3.8

我正在创建一个线程管理器类,该类将执行任务作为线程处理并将结果传递给下一个流程步骤。该流程在接收任务的第一次执行时正常工作,但第二次执行失败并出现以下错误:

...python3.8/concurrent/futures/thread.py", line 179, in submit
raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
Run Code Online (Sandbox Code Playgroud)

任务来自Cmd.cmdloop用户输入 - 因此,脚本是持久的,不会关闭。相反,run将被多次调用,因为从用户那里收到输入。

我已经实现了一个ThreadPoolExecutor来处理工作负载并尝试按时间顺序收集结果,concurrent.futures.as_completed以便每个项目按照完成的顺序处理到下一步。

run下面的方法非常适合第一次执行,但在第二次执行同一任务(在第一次执行期间成功)时返回错误。

  def run ( self, _executor=None, _futures={}, ) -> bool :
     
     task = self.pipeline.get( )
     
     with _executor or self.__default_executor as executor :
         
         _futures = { executor.submit ( task.target.execute, ), }

         for future in concurrent.futures.as_completed ( _futures, ) :
                     
             print( future.result ( ) )
                             
     return True
Run Code Online (Sandbox Code Playgroud)

因此,我们的想法是每次调用run都会创建和拆除executor上下文。但是错误表明上下文在第一次执行后正确关闭,并且run在第二次迭代期间调用时无法重新打开/重新创建......这个错误指向什么?..我错过了什么?

任何帮助都会很棒 - 提前致谢。

ALU*_*FTW 1

最简单的解决方案是使用多处理库,而不是使用上下文管理器发送 futures 和 ThreadPoolExecutore:

    pool = ThreadPool(50)
    pool.starmap(test_function, zip(array1,array2...))
    pool.close()
    pool.join()
Run Code Online (Sandbox Code Playgroud)

While(array1[0] , array2[0])将是发送到第一个线程、第二个线程的函数“test_function”的值(array1[1] , array2[1]),依此类推。