Python,并行处理大型文本文件

Xia*_*gwu 5 python parallel-processing

采样数据文件(SAM文件)中的记录:

M01383  0  chr4  66439384  255  31M  *  0  0  AAGAGGA GFAFHGD  MD:Z:31 NM:i:0
M01382  0  chr1  241995435  255 31M  *  0  0  ATCCAAG AFHTTAG  MD:Z:31 NM:i:0
......
Run Code Online (Sandbox Code Playgroud)
  • 数据文件是逐行的
  • 数据文件的大小从1G​​-5G不等。

我需要逐行浏览数据文件中的记录,从每一行获取一个特定值(例如,第四值,66439384),然后将此值传递给另一个函数进行处理。然后将更新一些结果计数器。

基本的工作流程是这样的:

# global variable, counters will be updated in search function according to the value passed. 
counter_a = 0    
counter_b = 0
counter_c = 0

open textfile:
    for line in textfile:
        value = line.split()[3]
        search_function(value)    # this function takes abit long time to process

def search_function (value):
    some conditions checking:
        update the counter_a or counter_b or counter_c
Run Code Online (Sandbox Code Playgroud)

使用单个进程代码和大约1.5G数据文件,大约需要20个小时来运行一个数据文件中的所有记录。我需要更快的代码,因为有超过30种此类数据文件。

我当时正在考虑并行处理N个数据块,每个数据块将在上述工作流程中执行并同时更新全局变量(counter_a,counter_b,counter_c)。但是我不知道如何在代码中实现这一目标,或者这是否可行。

我可以使用以下服务器服务器:24个处理器和大约40G RAM。

任何人都可以帮忙吗?非常感谢。

Ant*_*wns 4

最简单的方法可能是使用现有代码一次性处理所有 30 个文件——仍然需要一整天的时间,但您可以一次性完成所有文件。(即“9个月内生9个孩子”很容易,“1个月内生1个孩子”很难)

如果您确实想更快地完成单个文件,这将取决于计数器的实际更新方式。如果几乎所有工作都只是分析价值,您可以使用多处理模块卸载它:

import time
import multiprocessing

def slowfunc(value):
    time.sleep(0.01)
    return value**2 + 0.3*value + 1

counter_a = counter_b = counter_c = 0
def add_to_counter(res):
    global counter_a, counter_b, counter_c
    counter_a += res
    counter_b -= (res - 10)**2
    counter_c += (int(res) % 2)

pool = multiprocessing.Pool(50)
results = []

for value in range(100000):
    r = pool.apply_async(slowfunc, [value])
    results.append(r)

    # don't let the queue grow too long
    if len(results) == 1000:
        results[0].wait()

    while results and results[0].ready():
        r = results.pop(0)
        add_to_counter(r.get())

for r in results:
    r.wait()
    add_to_counter(r.get())

print counter_a, counter_b, counter_c
Run Code Online (Sandbox Code Playgroud)

这将允许 50 个 Slowfunc 并行运行,因此不需要 1000 秒(= 100k * 0.01 秒),而是需要 20 秒(100k/50)* 0.01 秒才能完成。如果您可以像上面那样将函数重组为“slowfunc”和“add_to_counter”,那么您应该能够获得 24 倍的加速。