使用具有最大同时进程数的multiprocessing.Process

Bre*_*ett 54 python multithreading multiprocessing

我有Python代码:

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    for i in range(0, MAX_PROCESSES):
        p = Process(target=f, args=(i,))
        p.start()
Run Code Online (Sandbox Code Playgroud)

运行良好.但是,它MAX_PROCESSES是可变的,可以是1和之间的任何值512.由于我只在具有8内核的机器上运行此代码,因此我需要了解是否可以限制允许同时运行的进程数.我已经调查过了multiprocessing.Queue,但它看起来并不像我需要的那样 - 或许我正在错误地解释文档.

有没有办法限制同时multiprocessing.Process运行的数量?

tre*_*ddy 79

multiprocessing.Pool根据系统上可用的最大内核数量生成一组工作进程可能是最明智的,然后在内核可用时基本上提供任务.

标准文档(http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers)中的示例显示您还可以手动设置核心数:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes
    result = pool.apply_async(f, [10])    # evaluate "f(10)" asynchronously
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"
Run Code Online (Sandbox Code Playgroud)

而且multiprocessing.cpu_count(),如果代码中需要,还可以使用方法来计算给定系统上的内核数量.

编辑:这是一些似乎适用于您的特定情况的草稿代码:

import multiprocessing

def f(name):
    print 'hello', name

if __name__ == '__main__':
    pool = multiprocessing.Pool() #use all available cores, otherwise specify the number you want as an argument
    for i in xrange(0, 512):
        pool.apply_async(f, args=(i,))
    pool.close()
    pool.join()
Run Code Online (Sandbox Code Playgroud)

  • `multiprocessing.cpu_count() - 1或1`可以是一个有用的启发式方法,用于决定并行运行多少个进程:-1避免通过独占所有内核来锁定系统,但如果只有一个CPU可用,那么``或者`给单核运行带来优雅的后退. (33认同)
  • 请注意,`multiprocessing.cpu_count()` 不是内核数,而是线程数(在超线程的意义上)。 (4认同)

mak*_*fly 17

我认为 Semaphore 就是你要找的,它会在倒数到 0 后阻塞主进程。示例代码:

from multiprocessing import Process
from multiprocessing import Semaphore
import time

def f(name, sema):
    print('process {} starting doing business'.format(name))
    # simulate a time-consuming task by sleeping
    time.sleep(5)
    # `release` will add 1 to `sema`, allowing other 
    # processes blocked on it to continue
    sema.release()

if __name__ == '__main__':
    concurrency = 20
    total_task_num = 1000
    sema = Semaphore(concurrency)
    all_processes = []
    for i in range(total_task_num):
        # once 20 processes are running, the following `acquire` call
        # will block the main process since `sema` has been reduced
        # to 0. This loop will continue only after one or more 
        # previously created processes complete.
        sema.acquire()
        p = Process(target=f, args=(i, sema))
        all_processes.append(p)
        p.start()

    # inside main process, wait for all processes to finish
    for p in all_processes:
        p.join()
Run Code Online (Sandbox Code Playgroud)

以下代码更加结构化,因为它sema在同一个函数中获取和释放。但是,如果total_task_num非常大,它会消耗太多资源:

from multiprocessing import Process
from multiprocessing import Semaphore
import time

def f(name, sema):
    print('process {} starting doing business'.format(name))
    # `sema` is acquired and released in the same
    # block of code here, making code more readable,
    # but may lead to problem.
    sema.acquire()
    time.sleep(5)
    sema.release()

if __name__ == '__main__':
    concurrency = 20
    total_task_num = 1000
    sema = Semaphore(concurrency)
    all_processes = []
    for i in range(total_task_num):
        p = Process(target=f, args=(i, sema))
        all_processes.append(p)
        # the following line won't block after 20 processes
        # have been created and running, instead it will carry 
        # on until all 1000 processes are created.
        p.start()

    # inside main process, wait for all processes to finish
    for p in all_processes:
        p.join()
Run Code Online (Sandbox Code Playgroud)

上面的代码会创建total_task_num进程,但只有concurrency进程会运行,而其他进程会被阻塞,消耗宝贵的系统资源。

  • 我在启动进程的过程中收到错误“OSError: [Errno 24] Too much open files”!难道我做错了什么 ? (3认同)

Bae*_*sch 7

更一般地说,这也可能是这样的:

import multiprocessing
def chunks(l, n):
    for i in range(0, len(l), n):
        yield l[i:i + n]

numberOfThreads = 4


if __name__ == '__main__':
    jobs = []
    for i, param in enumerate(params):
        p = multiprocessing.Process(target=f, args=(i,param))
        jobs.append(p)
    for i in chunks(jobs,numberOfThreads):
        for j in i:
            j.start()
        for j in i:
            j.join()
Run Code Online (Sandbox Code Playgroud)

当然,这种方式相当残酷(因为它会等待垃圾中的每个进程,直到继续处理下一个块)。对于函数调用的运行时间大致相等的情况,它仍然可以很好地工作。