dask包没有使用所有核心?备择方案?

tam*_*jd1 6 python parallel-processing json export-to-csv dask

我有一个python脚本,它执行以下操作:i.它接受数据的输入文件(通常是嵌套的JSON格式)ii.将数据逐行传递给另一个函数,该函数将数据处理成所需的格式iii.最后它将输出写入文件.

这是我目前的简单python线,这样做......

def manipulate(line):
    # a pure python function which transforms the data
    # ...
    return manipulated_json

for line in f:
    components.append(manipulate(ujson.loads(line)))
    write_to_csv(components)`
Run Code Online (Sandbox Code Playgroud)

这有效,但是python GIL将它限制在服务器上的一个核心,它的速度非常慢,特别是对于大量数据.

我通常处理的数据量约为4 gig gzip压缩,但偶尔我必须处理数百gig gzip压缩的数据.它不是必需的大数据,但仍无法在内存中进行处理,并且Python的GIL处理速度非常慢.

在寻找优化数据处理的解决方案时,我遇到了dask.虽然PySpark在当时似乎是我的明显解决方案,但是dask的承诺和它的简单性让我受益匪浅,我决定尝试一下.

经过对dask的大量研究以及如何使用它,我整理了一个非常小的脚本来复制我当前的过程.该脚本如下所示:

import dask.bag as bag
import json
bag.from_filenames('input.json.gz').map(json.loads).map(lambda x:manipulate(x)).concat().to_dataframe().to_csv('output.csv.gz')`
Run Code Online (Sandbox Code Playgroud)

这工作并产生与原始非dask脚本相同的结果,但它仍然只在服务器上使用一个CPU.所以,它根本没有帮助.事实上,它的速度较慢.

我究竟做错了什么?我错过了什么吗?我仍然相当新闻,所以如果我忽略了某些事情或者我应该做一些完全不同的事情,请告诉我.

另外,是否有任何替代方法可以使用服务器的全部容量(即所有CPU)来完成我需要做的事情?

谢谢,

Ť

MRo*_*lin 3

这里的问题是dask.dataframe.to_csv,它迫使您进入单核模式。

我建议使用dask.bag它来进行阅读和操作,然后并行转储到一堆 CSV 文件。转储到多个 CSV 文件比转储到单个 CSV 文件更容易协调。

import dask.bag as bag
import json
b = bag.from_filenames('input.json.gz').map(json.loads).map(manipulate).concat()
b.map(lambda t: ','.join(map(str, t)).to_textfiles('out.*.csv').compute()
Run Code Online (Sandbox Code Playgroud)

尝试并行读取单个 GZIP 文件也可能会出现问题,但以上内容应该可以帮助您入门。