python,如何逐步创建线程

use*_*807 5 python multithreading

我有一个项目列表aprox 60,000项 - 我想向数据库发送查询以检查它们是否存在以及它们是否确实返回了一些计算结果.我运行一个普通的查询,一个一个地遍历列表,查询已经运行了最后4天.我以为我可以使用线程模块来改进这一点.我做了这样的事

if __name__ == '__main__':
  for ra, dec in candidates:
    t = threading.Thread(target=search_sl, args=(ra,dec, q))
    t.start()
  t.join()
Run Code Online (Sandbox Code Playgroud)

我只测试了10个项目并且工作正常 - 当我提交了整个60k项目列表时,我遇到了错误,即"超出最大会话数".我想要做的是一次创建10个线程.当第一串线程完成激活时,我发送另一个请求,依此类推.

Mik*_*ain 7

您可以尝试使用多处理模块中提供的进程池.以下是python文档中的示例:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes
    result = pool.apply_async(f, [10])    # evaluate "f(10)" asynchronously
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"
Run Code Online (Sandbox Code Playgroud)

http://docs.python.org/library/multiprocessing.html#using-a-pool-of-workers

尝试增加进程数,直到达到系统可以支持的最大值.

  • 这不是一个线程池,而是一个进程池 - 当然这里没有太大的区别. (2认同)

Mak*_*cha 3

首先,您只加入最后一个线程。不能保证它会最后完成。你应该这样使用:

from time import sleep
delay = 0.5
tlist = [threading.Thread(target=search_sl, args=(ra,dec, q)) for ra, dec in candidates ]
map(lambda t:t.start(), tlist)
while(any(map(lambda t:t.isAlive()))): sleep(delay)
Run Code Online (Sandbox Code Playgroud)

第二个问题是目前运行的 60K 线程需要非常巨大的硬件资源:-) 最好将任务排队,然后由工作人员处理。工作线程的数量必须受到限制。就像这样(还没有测试代码,但我希望这个想法很清楚):

from Queue import Queue
from threading import Thread
from time import sleep
tasks = Queue()
map(tasks.put, candidates)
maxthreads = 50
delay = 0.1
try:
    threads = [Thread(target=search_sl, args=tasks.get()) \
               for i in xrange(0,maxthreads) ]
except Queue.Empty:
    pass
map(lambda t:t.start(), threads)

while not tasks.empty():
    threads = filter(lambda t:t.isAlive(), threads)
    while len(threads) < maxthreads:
        try:
            t = Thread(target=search_sl, args=tasks.get())
            t.start()
            threads.append(t)
        except Queue.Empty:
            break
    sleep(delay)

while(any(map(lambda t:t.isAlive(), threads))): sleep(delay)
Run Code Online (Sandbox Code Playgroud)