使用celery处理大量文本文件

Chr*_* F. 11 python performance hpc rabbitmq celery

背景

我正在研究使用芹菜(3.1.8)来处理每个文本文件(~30GB).这些文件采用fastq格式,包含大约118M的测序"读数",基本上每个都是标题,DNA序列和质量字符串的组合.此外,这些序列来自配对末端测序运行,因此我同时迭代两个文件(通过itertools.izip).我希望能够做的是将每对读取,发送到队列,并在我们集群中的一台机器上处理它们(不关心哪些)以返回清理后的版本如果清洁需要发生(例如,基于质量).

我已经建立了芹菜和兔子,我的工作人员如下:

celery worker -A tasks --autoreload -Q transient 
Run Code Online (Sandbox Code Playgroud)

并配置如下:

from kombu import Queue

BROKER_URL = 'amqp://guest@godel97'
CELERY_RESULT_BACKEND = 'rpc'
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT=['pickle', 'json']
CELERY_TIMEZONE = 'America/New York'
CELERY_ENABLE_UTC = True
CELERYD_PREFETCH_MULTIPLIER = 500

CELERY_QUEUES = (
    Queue('celery', routing_key='celery'),
    Queue('transient', routing_key='transient',delivery_mode=1),
)
Run Code Online (Sandbox Code Playgroud)

我选择使用rpc后端和pickle序列化来提高性能,以及不在"瞬态"队列中写入任何内容(通过delivery_mode).

芹菜创业

要设置芹菜框架,我首先在64路盒子上启动rabbitmq服务器(3.2.3,Erlang R16B03-1),将日志文件写入fast/tmp磁盘.工作进程(如上所述)在群集上的每个节点(大约34个)上启动,范围从8路到64路SMP,总共688个核心.因此,我有大量可用的CPU供工作人员用于处理队列.

工作提交/表现

芹菜启动并运行后,我通过ipython笔记本提交作业,如下所示:

files = [foo, bar]
f1 = open(files[0])
f2 = open(files[1])
res = []
count = 0
for r1, r2 in izip(FastqGeneralIterator(f1), FastqGeneralIterator(f2)):
    count += 1
    res.append(tasks.process_read_pair.s(r1, r2))
        if count == 10000:
        break
t.stop()
g = group(res)
for task in g.tasks:
    task.set(queue="transient")
Run Code Online (Sandbox Code Playgroud)

对于10000对读取,这需要大约1.5秒.然后,我呼吁小组延迟提交给工人,大约需要20秒,如下所示:

result = g.delay()
Run Code Online (Sandbox Code Playgroud)

使用rabbitmq控制台进行监控,我发现我做得很好,但速度不够快.

rabbitmq图

那么,有没有办法加快速度呢?我的意思是,我希望看到每秒至少处理50,000个读取对而不是500.在我的芹菜配置中是否有任何明显的缺失?我的工人和兔子日志基本上是空的.会喜欢一些关于如何提升表现的建议.每个单独的读取对也非常快速地处理:

[2014-01-29 13:13:06,352: INFO/Worker-1] tasks.process_read_pair[95ec7f2f-0143-455a-a23b-c032998951b8]: HWI-ST425:143:C04A5ACXX:3:1101:13938:2894 1:N:0:ACAGTG HWI-ST425:143:C04A5ACXX:3:1101:13938:2894 2:N:0:ACAGTG 0.00840497016907 sec
Run Code Online (Sandbox Code Playgroud)

到这一点

所以到目前为止,我已经用谷歌搜索了所有我能想到的芹菜,性能,路由,rabbitmq等等.我已经浏览了芹菜网站和文档.如果我无法获得更高的性能,我将不得不放弃这种方法而转向另一种解决方案(基本上将工作分成许多较小的物理文件,并使用多处理或其他方式直接在每个计算节点上处理它们).但是,如果无法在群集上传播此负载,那将是一种耻辱.此外,这似乎是一个非常优雅的解决方案.

在此先感谢您的帮助!

Xav*_*lle 1

One solution you have is that the reads are highly compressible so replacing the following

res.append(tasks.process_read_pair.s(r1, r2))
Run Code Online (Sandbox Code Playgroud)

by

res.append(tasks.process_bytes(zlib.compress(pickle.dumps((r1, r2))),
                                      protocol = pickle.HIGHEST_PROTOCOL),
                         level=1))
Run Code Online (Sandbox Code Playgroud)

并呼叫pickle.loads(zlib.decompress(obj))另一边的 a。

对于足够长的 DNA 序列,它应该赢得一个大因素,如果它们不够长,您可以将它们按块分组到一个数组中,然后转储和压缩。

如果您还没有这样做,另一个胜利可能是使用 ZeroMQ 进行传输。

我不确定 process_byte 应该是什么