为什么并行任务在第一时间总是很慢?

mar*_*ark 5 python parallel-processing bash joblib

我有一些分类器,我想对一个样本进行评估。由于它们彼此独立,因此可以并行运行此任务。这意味着我要并行化它。

我用python和bash脚本尝试过。问题是,当我第一次运行该程序时,大约需要30到40秒才能完成。当我连续多次运行该程序时,只需1s-3s即可完成。即使我用不同的输入来输入分类器,我也得到了不同的结果,因此似乎没有缓存。当我运行其他程序并随后重新运行该程序时,它又需要40秒钟才能完成。

我在htop中还观察到,第一次运行该程序时CPU利用率不高,但是当我一次又一次重新运行时,CPU利用率就很高。

有人可以向我解释这种奇怪的行为吗?我如何避免这种情况,这样即使程序第一次运行也会很快?

这是python代码:

import time
import os
from fastText import load_model
from joblib import delayed, Parallel, cpu_count
import json

os.system("taskset -p 0xff %d" % os.getpid())

def format_duration(start_time, end_time):
    m, s = divmod(end_time - start_time, 60)
    h, m = divmod(m, 60)
    return "%d:%02d:%02d" % (h, m, s)

def classify(x, classifier_name, path):
    f = load_model(path + os.path.sep + classifier_name)    
    labels, probabilities = f.predict(x, 2)
    if labels[0] == '__label__True':
        return classifier_name
    else:
        return None

if __name__ == '__main__':
    with open('classifier_names.json') as json_data:
        classifiers = json.load(json_data)
    x = "input_text"

    Parallel(n_jobs=cpu_count(), verbose=100, backend='multiprocessing', pre_dispatch='all') \
        (delayed(perform_binary_classification)
         (x, classifier, 'clfs/') for
         classifier in classifiers)

    end_time = time.time()
    print(format_duration(start_time, end_time))
Run Code Online (Sandbox Code Playgroud)

这是bash代码:

#!/usr/bin/env bash
N=4
START_TIME=$SECONDS
open_sem(){
    mkfifo pipe-$$
    exec 3<>pipe-$$
    rm pipe-$$
    local i=$1
    for((;i>0;i--)); do
        printf %s 000 >&3
    done
}
run_with_lock(){
    local x
    read -u 3 -n 3 x && ((0==x)) || exit $x
    (
    "$@" 
    printf '%.3d' $? >&3
    )&
}
open_sem $N
for d in classifiers/* ; do
    run_with_lock ~/fastText/fasttext predict "$d" test.txt 
done

ELAPSED_TIME=$(($SECONDS - $START_TIME))
echo time taken $ELAPSED_TIME seconds
Run Code Online (Sandbox Code Playgroud)

已编辑

更大的图景是我正在使用2种API方法运行flask应用程序。它们每个都调用使分类并行化的函数。当我执行请求时,它的行为与下面的程序相同。对方法A的第一个请求要花很多时间,随后的请求要花1秒。当我切换到方法B时,它的行为与方法A相同。如果我像A,B,A,B一样在方法A和方法B之间多次切换,则每个请求都需要40秒钟才能完成。

web*_*ebb 1

一种方法是修改 Python 代码以使用事件循环,始终保持运行状态,并在检测到新作业时并行执行新作业。实现此目的的一种方法是拥有一个作业目录,并在有新作业待办事项时将文件放入该目录中。python 脚本还应该将已完成的作业移出该目录,以防止多次运行它们。当目录中发生任何变化时,如何使用 Python Watchdog 运行函数?

另一种选择是使用通过管道传输到 python 脚本的 fifo 文件,并向该文件添加新行以执行新作业。https://www.linuxjournal.com/content/using-named-pipes-fifos-bash

我个人不喜欢在 python 中并行化,更喜欢使用 GNU 并行在 bash 中并行化。为了做到这一点,我会

  • 使用 bash 和 GNU 并行实现事件循环和作业目录或 fifo 文件作业队列
  • 修改python脚本以删除所有并行代码
  • 从标准输入读取每个作业规范
  • 在循环中串行处理每一个
  • 将作业通过管道传送到并行,将它们传送到 ncpu python 进程,每个进程永远运行,等待来自 stdin 的下一个作业

例如,类似:

run_jobs.sh:
mkfifo jobs
cat jobs | parallel --pipe --round-robin -n1 ~/fastText/fasttext

queue_jobs.sh:
echo jobspec >> jobs

.py:
for jobspec in sys.stdin:
    ...
Run Code Online (Sandbox Code Playgroud)

这样做的缺点是所有ncpu python进程都可能存在启动缓慢的问题,但它们可以无限期地保持运行,因此问题变得无关紧要,并且代码更简单,更易于调试和维护。

使用作业目录和每个作业规范的文件而不是 fifo 作业队列需要稍微多的代码,但它也可以更直接地查看哪些作业已排队以及哪些作业已完成。