Python中的多处理,同时限制正在运行的进程数

ste*_*ast 18 python multithreading multiprocessing

我想同时运行program.py的多个实例,同时限制同时运行的实例数(例如,我的系统上可用的CPU内核数).例如,如果我有10个内核并且总共需要运行1000次program.py,那么在任何给定时间只会创建并运行10个实例.

我已经尝试过使用多处理模块,多线程和使用队列,但在我看来似乎没有任何东西能够实现简单的实现.我遇到的最大问题是找到一种方法来限制同时运行的进程数.这很重要,因为如果我一次创建1000个进程,它就相当于一个fork炸弹.我不需要以编程方式从进程返回的结果(它们输出到磁盘),并且所有进程都彼此独立地运行.

任何人都可以给我一些建议或一个例子,说明如何在python中实现这一点,甚至是bash?我发布到目前为止我使用队列编写的代码,但它没有按预期工作,可能已经走错了路.

非常感谢.

jdi*_*jdi 21

我知道你提到过Pool.map方法对你没有多大意义.地图只是一种简单的方法,可以为它提供工作源,并且可以调用它来应用于每个项目.在func该地图可以是任何入口点做定为arg的实际工作.

如果这对您来说不合适,我在这里有一个关于使用Producer-Consumer模式的非常详细的答案:https://stackoverflow.com/a/11196615/496445

基本上,您创建一个队列,并启动N个工作人员.然后,您可以从主线程提供队列,也可以创建一个为队列提供信息的Producer进程.工作人员只是继续从队列中获取工作,并且永远不会发生比您已经开始的进程数更多的并发工作.

您还可以选择对队列设置限制,以便在已经有太多未完成的工作时阻塞生产者,如果您还需要对生产者消耗的速度和资源施加约束.

被调用的工作函数可以做任何你想做的事情.这可以是一些系统命令的包装,或者它可以导入你的python lib并运行主例程.有一些特定的进程管理系统可以让你设置configs来在有限的资源下运行你的任意可执行文件,但这只是一个基本的python方法.

我的其他答案的片段:

基本池:

from multiprocessing import Pool

def do_work(val):
    # could instantiate some other library class,
    # call out to the file system,
    # or do something simple right here.
    return "FOO: %s" % val

pool = Pool(4)
work = get_work_args()
results = pool.map(do_work, work)
Run Code Online (Sandbox Code Playgroud)

使用流程管理器和生产者

from multiprocessing import Process, Manager
import time
import itertools

def do_work(in_queue, out_list):
    while True:
        item = in_queue.get()

        # exit signal 
        if item == None:
            return

        # fake work
        time.sleep(.5)
        result = item

        out_list.append(result)


if __name__ == "__main__":
    num_workers = 4

    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)

    # start for workers    
    pool = []
    for i in xrange(num_workers):
        p = Process(target=do_work, args=(work, results))
        p.start()
        pool.append(p)

    # produce data
    # this could also be started in a producer process
    # instead of blocking
    iters = itertools.chain(get_work_args(), (None,)*num_workers)
    for item in iters:
        work.put(item)

    for p in pool:
        p.join()

    print results
Run Code Online (Sandbox Code Playgroud)


小智 2

Bash 脚本而不是 Python,但我经常使用它进行简单的并行处理:

#!/usr/bin/env bash
waitForNProcs()
{
 nprocs=$(pgrep -f $procName | wc -l)
 while [ $nprocs -gt $MAXPROCS ]; do
  sleep $SLEEPTIME
  nprocs=$(pgrep -f $procName | wc -l)
 done
}
SLEEPTIME=3
MAXPROCS=10
procName=myPython.py
for file in ./data/*.txt; do
 waitForNProcs
 ./$procName $file &
done
Run Code Online (Sandbox Code Playgroud)

或者对于非常简单的情况,另一个选项是 xargs,其中 P 设置进程数

find ./data/ | grep txt | xargs -P10 -I SUB ./myPython.py SUB 
Run Code Online (Sandbox Code Playgroud)