Bre*_*ett 54 python multithreading multiprocessing
我有Python代码:
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
for i in range(0, MAX_PROCESSES):
p = Process(target=f, args=(i,))
p.start()
Run Code Online (Sandbox Code Playgroud)
运行良好.但是,它MAX_PROCESSES是可变的,可以是1和之间的任何值512.由于我只在具有8内核的机器上运行此代码,因此我需要了解是否可以限制允许同时运行的进程数.我已经调查过了multiprocessing.Queue,但它看起来并不像我需要的那样 - 或许我正在错误地解释文档.
有没有办法限制同时multiprocessing.Process运行的数量?
tre*_*ddy 79
multiprocessing.Pool根据系统上可用的最大内核数量生成一组工作进程可能是最明智的,然后在内核可用时基本上提供任务.
标准文档(http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers)中的示例显示您还可以手动设置核心数:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
Run Code Online (Sandbox Code Playgroud)
而且multiprocessing.cpu_count(),如果代码中需要,还可以使用方法来计算给定系统上的内核数量.
编辑:这是一些似乎适用于您的特定情况的草稿代码:
import multiprocessing
def f(name):
print 'hello', name
if __name__ == '__main__':
pool = multiprocessing.Pool() #use all available cores, otherwise specify the number you want as an argument
for i in xrange(0, 512):
pool.apply_async(f, args=(i,))
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
mak*_*fly 17
我认为 Semaphore 就是你要找的,它会在倒数到 0 后阻塞主进程。示例代码:
from multiprocessing import Process
from multiprocessing import Semaphore
import time
def f(name, sema):
print('process {} starting doing business'.format(name))
# simulate a time-consuming task by sleeping
time.sleep(5)
# `release` will add 1 to `sema`, allowing other
# processes blocked on it to continue
sema.release()
if __name__ == '__main__':
concurrency = 20
total_task_num = 1000
sema = Semaphore(concurrency)
all_processes = []
for i in range(total_task_num):
# once 20 processes are running, the following `acquire` call
# will block the main process since `sema` has been reduced
# to 0. This loop will continue only after one or more
# previously created processes complete.
sema.acquire()
p = Process(target=f, args=(i, sema))
all_processes.append(p)
p.start()
# inside main process, wait for all processes to finish
for p in all_processes:
p.join()
Run Code Online (Sandbox Code Playgroud)
以下代码更加结构化,因为它sema在同一个函数中获取和释放。但是,如果total_task_num非常大,它会消耗太多资源:
from multiprocessing import Process
from multiprocessing import Semaphore
import time
def f(name, sema):
print('process {} starting doing business'.format(name))
# `sema` is acquired and released in the same
# block of code here, making code more readable,
# but may lead to problem.
sema.acquire()
time.sleep(5)
sema.release()
if __name__ == '__main__':
concurrency = 20
total_task_num = 1000
sema = Semaphore(concurrency)
all_processes = []
for i in range(total_task_num):
p = Process(target=f, args=(i, sema))
all_processes.append(p)
# the following line won't block after 20 processes
# have been created and running, instead it will carry
# on until all 1000 processes are created.
p.start()
# inside main process, wait for all processes to finish
for p in all_processes:
p.join()
Run Code Online (Sandbox Code Playgroud)
上面的代码会创建total_task_num进程,但只有concurrency进程会运行,而其他进程会被阻塞,消耗宝贵的系统资源。
更一般地说,这也可能是这样的:
import multiprocessing
def chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]
numberOfThreads = 4
if __name__ == '__main__':
jobs = []
for i, param in enumerate(params):
p = multiprocessing.Process(target=f, args=(i,param))
jobs.append(p)
for i in chunks(jobs,numberOfThreads):
for j in i:
j.start()
for j in i:
j.join()
Run Code Online (Sandbox Code Playgroud)
当然,这种方式相当残酷(因为它会等待垃圾中的每个进程,直到继续处理下一个块)。对于函数调用的运行时间大致相等的情况,它仍然可以很好地工作。
| 归档时间: |
|
| 查看次数: |
65409 次 |
| 最近记录: |