Xia*_*gwu 5 python parallel-processing
采样数据文件(SAM文件)中的记录:
M01383 0 chr4 66439384 255 31M * 0 0 AAGAGGA GFAFHGD MD:Z:31 NM:i:0
M01382 0 chr1 241995435 255 31M * 0 0 ATCCAAG AFHTTAG MD:Z:31 NM:i:0
......
Run Code Online (Sandbox Code Playgroud)
我需要逐行浏览数据文件中的记录,从每一行获取一个特定值(例如,第四值,66439384),然后将此值传递给另一个函数进行处理。然后将更新一些结果计数器。
基本的工作流程是这样的:
# global variable, counters will be updated in search function according to the value passed.
counter_a = 0
counter_b = 0
counter_c = 0
open textfile:
for line in textfile:
value = line.split()[3]
search_function(value) # this function takes abit long time to process
def search_function (value):
some conditions checking:
update the counter_a or counter_b or counter_c
Run Code Online (Sandbox Code Playgroud)
使用单个进程代码和大约1.5G数据文件,大约需要20个小时来运行一个数据文件中的所有记录。我需要更快的代码,因为有超过30种此类数据文件。
我当时正在考虑并行处理N个数据块,每个数据块将在上述工作流程中执行并同时更新全局变量(counter_a,counter_b,counter_c)。但是我不知道如何在代码中实现这一目标,或者这是否可行。
我可以使用以下服务器服务器:24个处理器和大约40G RAM。
任何人都可以帮忙吗?非常感谢。
最简单的方法可能是使用现有代码一次性处理所有 30 个文件——仍然需要一整天的时间,但您可以一次性完成所有文件。(即“9个月内生9个孩子”很容易,“1个月内生1个孩子”很难)
如果您确实想更快地完成单个文件,这将取决于计数器的实际更新方式。如果几乎所有工作都只是分析价值,您可以使用多处理模块卸载它:
import time
import multiprocessing
def slowfunc(value):
time.sleep(0.01)
return value**2 + 0.3*value + 1
counter_a = counter_b = counter_c = 0
def add_to_counter(res):
global counter_a, counter_b, counter_c
counter_a += res
counter_b -= (res - 10)**2
counter_c += (int(res) % 2)
pool = multiprocessing.Pool(50)
results = []
for value in range(100000):
r = pool.apply_async(slowfunc, [value])
results.append(r)
# don't let the queue grow too long
if len(results) == 1000:
results[0].wait()
while results and results[0].ready():
r = results.pop(0)
add_to_counter(r.get())
for r in results:
r.wait()
add_to_counter(r.get())
print counter_a, counter_b, counter_c
Run Code Online (Sandbox Code Playgroud)
这将允许 50 个 Slowfunc 并行运行,因此不需要 1000 秒(= 100k * 0.01 秒),而是需要 20 秒(100k/50)* 0.01 秒才能完成。如果您可以像上面那样将函数重组为“slowfunc”和“add_to_counter”,那么您应该能够获得 24 倍的加速。