Python 多处理进程在一段时间后休眠

Gr3*_*und 8 python multithreading replace cpu-usage multiprocessing

我有一个脚本,它在一个目录中运行并搜索给定字符串的给定结尾(即 .xml)的所有文件并替换它们。为了实现这一点,我使用了 python 多处理库。

例如,我使用了 1100 个 .xml 文件,其中包含大约 200MB 的数据。我的 MBP '15 15" 的完整执行时间为 8 分钟。

但是几分钟后,进程的进程将进入睡眠状态,我在“顶部”中看到(这里是 7m 之后......)。

最高输出

PID   COMMAND      %CPU  TIME     #TH    #WQ  #PORT MEM    PURG   CMPR PGRP PPID STATE    BOOSTS         %CPU_ME %CPU_OTHRS
1007  Python       0.0   07:03.51 1      0    7     5196K  0B     0B   998  998  sleeping *0[1]          0.00000 0.00000
1006  Python       99.8  07:29.07 1/1    0    7     4840K  0B     0B   998  998  running  *0[1]          0.00000 0.00000
1005  Python       0.0   02:10.02 1      0    7     4380K  0B     0B   998  998  sleeping *0[1]          0.00000 0.00000
1004  Python       0.0   04:24.44 1      0    7     4624K  0B     0B   998  998  sleeping *0[1]          0.00000 0.00000
1003  Python       0.0   04:25.34 1      0    7     4572K  0B     0B   998  998  sleeping *0[1]          0.00000 0.00000
1002  Python       0.0   04:53.40 1      0    7     4612K  0B     0B   998  998  sleeping *0[1]          0.00000 0.00000
Run Code Online (Sandbox Code Playgroud)

所以现在只有一个进程在做所有的工作,而其他进程在 4 分钟后就睡着了。

代码片段

# set cpu pool to cores in computer
pool_size = multiprocessing.cpu_count()

# create pool
pool = multiprocessing.Pool(processes=pool_size)

# give pool function and input data - here for each file in file_list
pool_outputs = pool.map(check_file, file_list)

# if no more tasks are available: close all
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)

那么为什么所有进程都在休眠呢?

我的猜测:文件列表与池中的所有工作人员分开(每个工作人员数量相同),少数人只是“幸运”获得小文件 - 因此提前完成。这是真的吗?我只是在想它更像一个队列,这样每个工作人员在完成时都会得到一个新文件 - 直到列表为空。

Gr3*_*und 5

正如 @Felipe-Lema 指出的,它是一个经典的 RTFM。

我使用多处理队列而不是池重新设计了脚本的上述部分,并改进了运行时:

def check_files(file_list):
    """Checks and replaces lines in files
    @param file_list: list of files to search
    @return counter: number of occurrence """

    # as much workers as CPUs are available (HT included)
    workers = multiprocessing.cpu_count()

    # create two queues: one for files, one for results
    work_queue = Queue()
    done_queue = Queue()
    processes = []

    # add every file to work queue
    for filename in file_list:
        work_queue.put(filename)

    # start processes
    for w in xrange(workers):
        p = Process(target=worker, args=(work_queue, done_queue))
        p.start()
        processes.append(p)
        work_queue.put('STOP')

    # wait until all processes finished
    for p in processes:
        p.join()

    done_queue.put('STOP')

    # beautify results and return them
    results = []
    for status in iter(done_queue.get, 'STOP'):
        if status is not None:
             results.append(status)

     return results
Run Code Online (Sandbox Code Playgroud)