mar*_*ark 5 python parallel-processing bash joblib
我有一些分类器,我想对一个样本进行评估。由于它们彼此独立,因此可以并行运行此任务。这意味着我要并行化它。
我用python和bash脚本尝试过。问题是,当我第一次运行该程序时,大约需要30到40秒才能完成。当我连续多次运行该程序时,只需1s-3s即可完成。即使我用不同的输入来输入分类器,我也得到了不同的结果,因此似乎没有缓存。当我运行其他程序并随后重新运行该程序时,它又需要40秒钟才能完成。
我在htop中还观察到,第一次运行该程序时CPU利用率不高,但是当我一次又一次重新运行时,CPU利用率就很高。
有人可以向我解释这种奇怪的行为吗?我如何避免这种情况,这样即使程序第一次运行也会很快?
这是python代码:
import time
import os
from fastText import load_model
from joblib import delayed, Parallel, cpu_count
import json
os.system("taskset -p 0xff %d" % os.getpid())
def format_duration(start_time, end_time):
m, s = divmod(end_time - start_time, 60)
h, m = divmod(m, 60)
return "%d:%02d:%02d" % (h, m, s)
def classify(x, classifier_name, path):
f = load_model(path + os.path.sep + classifier_name)
labels, probabilities = f.predict(x, 2)
if labels[0] == '__label__True':
return classifier_name
else:
return None
if __name__ == '__main__':
with open('classifier_names.json') as json_data:
classifiers = json.load(json_data)
x = "input_text"
Parallel(n_jobs=cpu_count(), verbose=100, backend='multiprocessing', pre_dispatch='all') \
(delayed(perform_binary_classification)
(x, classifier, 'clfs/') for
classifier in classifiers)
end_time = time.time()
print(format_duration(start_time, end_time))
Run Code Online (Sandbox Code Playgroud)
这是bash代码:
#!/usr/bin/env bash
N=4
START_TIME=$SECONDS
open_sem(){
mkfifo pipe-$$
exec 3<>pipe-$$
rm pipe-$$
local i=$1
for((;i>0;i--)); do
printf %s 000 >&3
done
}
run_with_lock(){
local x
read -u 3 -n 3 x && ((0==x)) || exit $x
(
"$@"
printf '%.3d' $? >&3
)&
}
open_sem $N
for d in classifiers/* ; do
run_with_lock ~/fastText/fasttext predict "$d" test.txt
done
ELAPSED_TIME=$(($SECONDS - $START_TIME))
echo time taken $ELAPSED_TIME seconds
Run Code Online (Sandbox Code Playgroud)
已编辑
更大的图景是我正在使用2种API方法运行flask应用程序。它们每个都调用使分类并行化的函数。当我执行请求时,它的行为与下面的程序相同。对方法A的第一个请求要花很多时间,随后的请求要花1秒。当我切换到方法B时,它的行为与方法A相同。如果我像A,B,A,B一样在方法A和方法B之间多次切换,则每个请求都需要40秒钟才能完成。
一种方法是修改 Python 代码以使用事件循环,始终保持运行状态,并在检测到新作业时并行执行新作业。实现此目的的一种方法是拥有一个作业目录,并在有新作业待办事项时将文件放入该目录中。python 脚本还应该将已完成的作业移出该目录,以防止多次运行它们。当目录中发生任何变化时,如何使用 Python Watchdog 运行函数?
另一种选择是使用通过管道传输到 python 脚本的 fifo 文件,并向该文件添加新行以执行新作业。https://www.linuxjournal.com/content/using-named-pipes-fifos-bash
我个人不喜欢在 python 中并行化,更喜欢使用 GNU 并行在 bash 中并行化。为了做到这一点,我会
例如,类似:
run_jobs.sh:
mkfifo jobs
cat jobs | parallel --pipe --round-robin -n1 ~/fastText/fasttext
queue_jobs.sh:
echo jobspec >> jobs
.py:
for jobspec in sys.stdin:
...
Run Code Online (Sandbox Code Playgroud)
这样做的缺点是所有ncpu python进程都可能存在启动缓慢的问题,但它们可以无限期地保持运行,因此问题变得无关紧要,并且代码更简单,更易于调试和维护。
使用作业目录和每个作业规范的文件而不是 fifo 作业队列需要稍微多的代码,但它也可以更直接地查看哪些作业已排队以及哪些作业已完成。
归档时间: |
|
查看次数: |
211 次 |
最近记录: |