ton*_*uan 4 python multiprocessing
我有多个正在运行的Process线程,我想将它们与所有 timeout参数一起加入。我知道,如果没有超时的必要,我可以写:
for thread in threads:
thread.join()
Run Code Online (Sandbox Code Playgroud)
我想到的一种解决方案是使用将所有线程连接在一起的主线程,然后尝试连接该线程。但是,我在Python中收到以下错误:
AssertionError: can only join a child process
Run Code Online (Sandbox Code Playgroud)
我的代码如下。
def join_all(threads):
for thread in threads:
thread.join()
if __name__ == '__main__':
for thread in threads:
thread.start()
master = multiprocessing.Process(target=join_all, args=(threads,))
master.start()
master.join(timeout=60)
Run Code Online (Sandbox Code Playgroud)
您可以反复循环遍历每个线程,进行非阻塞检查以查看线程是否完成:
import time
def timed_join_all(threads, timeout):
start = cur_time = time.time()
while cur_time <= (start + timeout):
for thread in threads:
if not thread.is_alive():
thread.join()
time.sleep(1)
cur_time = time.time()
if __name__ == '__main__':
for thread in threads:
thread.start()
timed_join_all(threads, 60)
Run Code Online (Sandbox Code Playgroud)
这个答案最初基于 dano 的答案,但有一些变化。
\n\njoin_all
获取线程列表和超时(以秒为单位)并尝试加入所有线程。它通过进行非阻塞调用来实现这一点Thread.join
(通过将超时设置为0
,因为join
没有参数将永远不会超时)。
一旦所有线程完成(通过检查is_alive()
每个线程),循环将提前退出。
如果在超时发生时某些线程仍在运行,则该函数将引发RuntimeError
并包含有关剩余线程的信息。
import time\n\ndef join_all(threads, timeout):\n """\n Args:\n threads: a list of thread objects to join\n timeout: the maximum time to wait for the threads to finish\n Raises:\n RuntimeError: is not all the threads have finished by the timeout\n """\n start = cur_time = time.time()\n while cur_time <= (start + timeout):\n for thread in threads:\n if thread.is_alive():\n thread.join(timeout=0)\n if all(not t.is_alive() for t in threads):\n break\n time.sleep(0.1)\n cur_time = time.time()\n else:\n still_running = [t for t in threads if t.is_alive()]\n num = len(still_running)\n names = [t.name for t in still_running]\n raise RuntimeError(\'Timeout on {0} threads: {1}\'.format(num, names))\n\nif __name__ == \'__main__\':\n for thread in threads:\n thread.start()\n\n join_all(threads, 60)\n
Run Code Online (Sandbox Code Playgroud)\n\n在我对此的使用中,它位于一个测试套件中,其中线程是ExcThread的 d\xc3\xa6monized 版本,因此如果线程从未完成运行,也没关系。
\n