无法在Python中创建新线程

Lad*_*ady 6 python multithreading python-multithreading

import threading

threads = []
for n in range(0, 60000):
    t = threading.Thread(target=function,args=(x, n))
    t.start()
    threads.append(t)
for t in threads:
    t.join()
Run Code Online (Sandbox Code Playgroud)

它在我的笔记本电脑上的范围高达800,但是如果我将范围增加到800以上,我就会收到错误can't create new thread.

如何控制线程创建的数量或任何其他方式使其像超时一样工作?我尝试使用threading.BoundedSemaphore功能,但似乎没有正常工作.

aba*_*ert 18

问题是没有主要平台(截至2013年中)可以让你创建接近这个线程数的任何地方.您可能遇到各种各样的限制,如果不了解您的平台,配置以及您所遇到的确切错误,就无法知道您遇到的是哪一个.但这里有两个例子:

  • 在32位Windows上,默认线程堆栈为1MB,并且所有线程堆栈必须与程序中的其他所有内容相匹配,因此您将在60000之前耗尽.
  • 在64位Linux上,ulimit在你的页面空间不足之前,你可能会耗尽会话的一个软值.(Linux有各种不同的限制,超出了POSIX所要求的限制.)

那么,我如何控制线程的数量来创建或任何其他方式使其像超时或其他什么工作?

使用尽可能多的线程不太可能是您真正想要做的.在8核计算机上运行800个线程意味着您需要在线程之间花费大量时间进行上下文切换,并且缓存在被启动之前会持续刷新,等等.

最有可能的是,您真正想要的是以下之一:

  • 每个CPU一个线程,为60000个任务提供服务.
    • 也许是进程而不是线程(如果主要工作是在Python中,或者在C代码中没有明确释放GIL).
    • 也许固定数量的线程(例如,网络浏览器可以做,例如,一次12个并发请求,无论你有1个核心还是64个).
    • 也许是一组,比如600个批次的100个任务,而不是60000个单任务.
  • 60000个合作调度的光纤/ greenlets /微线程共享一个真正的线程.
    • 也许是显式协程而不是调度程序.
    • 或者通过例如"魔术"合作社gevent.
    • 每个CPU可能有一个线程,每个线程运行1/N的光纤.

但这当然是可能的.

一旦你达到了你所达到的任何限制,很可能再次尝试将失败,直到一个线程完成其工作并加入,并且很有可能在此之后再次尝试将成功.因此,鉴于您显然正在获得异常,您可以像处理Python中的任何其他方式一样处理此问题:使用try/ exceptblock.例如,像这样:

threads = []
for n in range(0, 60000):
    while True:
        t = threading.Thread(target=function,args=(x, n))
        try:
            t.start()
            threads.append(t)
        except WhateverTheExceptionIs as e:
            if threads:
                threads[0].join()
                del threads[0]
            else:
                raise
        else:
            break
for t in threads:
    t.join()
Run Code Online (Sandbox Code Playgroud)

当然,这假设启动的第一个任务可能是完成的第一个任务之一.如果不是这样,你需要一些方法来明确表示完成(条件,信号量,队列等),或者你需要使用一些较低级别(特定于平台的)库来为你提供一种方法.等待整个列表,直到至少一个线程完成.

另外,请注意,在某些平台(例如,Windows XP)上,您可以获得接近极限的奇怪行为.


除了更好,做正确的事情也可能更简单.例如,这是一个每CPU进程池:

with concurrent.futures.ProcessPoolExecutor() as executor:
    fs = [executor.submit(function, x, n) for n in range(60000)]
    concurrent.futures.wait(fs)
Run Code Online (Sandbox Code Playgroud)

...和一个固定线程计数池:

with concurrent.futures.ThreadPoolExecutor(12) as executor:
    fs = [executor.submit(function, x, n) for n in range(60000)]
    concurrent.futures.wait(fs)
Run Code Online (Sandbox Code Playgroud)

...和平衡CPU并行 - 与numpy-vectorization批处理池:

with concurrent.futures.ThreadPoolExecutor() as executor:
    batchsize = 60000 // os.cpu_count()
    fs = [executor.submit(np.vector_function, x, 
                          np.arange(n, min(n+batchsize, 60000)))
          for n in range(0, 60000, batchsize)]
    concurrent.futures.wait(fs)
Run Code Online (Sandbox Code Playgroud)

在上面的例子中,我使用列表推导来提交所有工作并收集他们的未来,因为我们在循环中没有做任何其他事情.但是根据你的评论,听起来你在循环中还有其他想要做的事情.所以,让我们将它转​​换回一个明确的for声明:

with concurrent.futures.ProcessPoolExecutor() as executor:
    fs = []
    for n in range(60000):
        fs.append(executor.submit(function, x, n))
    concurrent.futures.wait(fs)
Run Code Online (Sandbox Code Playgroud)

现在,无论你想在循环内添加什么,你都可以.


但是,我不认为你真的想在该循环中添加任何内容.循环只是尽快提交所有作业; 它是wait等待它们全部完成的功能,而且你可能想要提前退出.

要做到这一点,你可以用waitFIRST_COMPLETED标志,但它更易于使用as_completed.

此外,我假设error是某种由任务设置的值.在这种情况下,你需要Lock在它周围放置一个,就像线程之间共享的任何其他可变值一样.(这是一个地方,在a ProcessPoolExecutor和a 之间只有一行差异ThreadPoolExecutor- 如果你使用流程,你需要multiprocessing.Lock代替threading.Lock.)

所以:

error_lock = threading.Lock
error = []

def function(x, n):
    # blah blah
    try:
        # blah blah
    except Exception as e:
        with error_lock:
            error.append(e)
    # blah blah

with concurrent.futures.ProcessPoolExecutor() as executor:
    fs = [executor.submit(function, x, n) for n in range(60000)]
    for f in concurrent.futures.as_completed(fs):
        do_something_with(f.result())
        with error_lock:
            if len(error) > 1: exit()
Run Code Online (Sandbox Code Playgroud)

但是,您可能需要考虑不同的设计.一般来说,如果你可以避免线程之间的共享,你的生活会变得容易多了.并且期货旨在通过让您返回值或引发异常来实现这一点,就像常规函数调用一样.这f.result()将为您提供返回值或引发引发的异常.因此,您可以将该代码重写为:

def function(x, n):
    # blah blah
    # don't bother to catch exceptions here, let them propagate out

with concurrent.futures.ProcessPoolExecutor() as executor:
    fs = [executor.submit(function, x, n) for n in range(60000)]
    error = []
    for f in concurrent.futures.as_completed(fs):
        try:
            result = f.result()
        except Exception as e:
            error.append(e)
            if len(error) > 1: exit()
        else:
            do_something_with(result)
Run Code Online (Sandbox Code Playgroud)

请注意这与docs中的ThreadPoolExecutor示例有多相似.只要任务不需要彼此交互,这个简单的模式足以处理几乎没有锁的任何东西.