Python-超时加入多个线程

ton*_*uan 4 python multiprocessing

我有多个正在运行的Process线程,我想将它们与所有 timeout参数一起加入。我知道,如果没有超时的必要,我可以写:

for thread in threads:
    thread.join()
Run Code Online (Sandbox Code Playgroud)

我想到的一种解决方案是使用将所有线程连接在一起的主线程,然后尝试连接该线程。但是,我在Python中收到以下错误:

AssertionError: can only join a child process
Run Code Online (Sandbox Code Playgroud)

我的代码如下。

def join_all(threads):
    for thread in threads:
        thread.join()

if __name__ == '__main__':
    for thread in threads:
        thread.start()

    master = multiprocessing.Process(target=join_all, args=(threads,))
    master.start()
    master.join(timeout=60)
Run Code Online (Sandbox Code Playgroud)

dan*_*ano 5

您可以反复循环遍历每个线程,进行非阻塞检查以查看线程是否完成:

import time

def timed_join_all(threads, timeout):
    start = cur_time = time.time()
    while cur_time <= (start + timeout):
        for thread in threads:
            if not thread.is_alive():
                thread.join()
        time.sleep(1)
        cur_time = time.time()

if __name__ == '__main__':
    for thread in threads:
        thread.start()

    timed_join_all(threads, 60)
Run Code Online (Sandbox Code Playgroud)


Mil*_*ams 5

这个答案最初基于 dano 的答案,但有一些变化。

\n\n

join_all获取线程列表和超时(以秒为单位)并尝试加入所有线程。它通过进行非阻塞调用来实现这一点Thread.join(通过将超时设置为0,因为join没有参数将永远不会超时)。

\n\n

一旦所有线程完成(通过检查is_alive()每个线程),循环将提前退出。

\n\n

如果在超时发生时某些线程仍在运行,则该函数将引发RuntimeError并包含有关剩余线程的信息。

\n\n
import time\n\ndef join_all(threads, timeout):\n    """\n    Args:\n        threads: a list of thread objects to join\n        timeout: the maximum time to wait for the threads to finish\n    Raises:\n        RuntimeError: is not all the threads have finished by the timeout\n    """\n    start = cur_time = time.time()\n    while cur_time <= (start + timeout):\n        for thread in threads:\n            if thread.is_alive():\n                thread.join(timeout=0)\n        if all(not t.is_alive() for t in threads):\n            break\n        time.sleep(0.1)\n        cur_time = time.time()\n    else:\n        still_running = [t for t in threads if t.is_alive()]\n        num = len(still_running)\n        names = [t.name for t in still_running]\n        raise RuntimeError(\'Timeout on {0} threads: {1}\'.format(num, names))\n\nif __name__ == \'__main__\':\n    for thread in threads:\n        thread.start()\n\n    join_all(threads, 60)\n
Run Code Online (Sandbox Code Playgroud)\n\n

在我对此的使用中,它位于一个测试套件中,其中线程是ExcThread的 d\xc3\xa6monized 版本,因此如果线程从未完成运行,也没关系。

\n