CSV 日期解析的 Dask 性能缓慢?

Mik*_*hea 5 python performance multithreading pandas dask

我一直在对一大堆文件进行大量文本处理,包括大型 CSV 文件和大量小型 XML 文件。有时我在做聚合计数,但很多时候我在做 NLP 类型的工作,以更深入地查看这些文件中的内容,而不是标记或已经结构化的内容。

我一直在使用多处理库在多个 CPU 上执行这些计算,但我已经爱上了 Dask 背后的想法,并且在网络上和同事都强烈推荐它。

我在这里问了一个关于 Dask 性能的类似问题:

Python Dask 包性能下降?

和 MRocklin ( /sf/users/43163151/ ) 让我知道加载大量小文件可能会破坏性能。

然而,当我在单个大文件(200mb)上运行它时,我仍然没有让它表现得很好。下面是一个例子:

我有一个 900,000 行的推文 CSV 文件,我想快速加载它并解析“created_at”字段。以下是我完成的三种方法以及每种方法的基准。我在配备 16GB 内存的新 i7 2016 MacBook Pro 上运行此程序。

import pandas
import dask.dataframe as dd
import multiprocessing

%%time
# Single Threaded, no chunking
d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", parse_dates = ["created_at"])
print(len(d))
Run Code Online (Sandbox Code Playgroud)

CPU 时间:用户 2 分 31 秒,系统:807 毫秒,总计:2 分 32 秒挂墙时间:2 分 32 秒

%%time
# Multithreaded chunking
def parse_frame_dates(frame):
    frame["created_at"] = pandas.to_datetime(frame["created_at"])
    return(frame)

d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", chunksize = 100000)
frames = multiprocessing.Pool().imap_unordered(get_count, d)
td = pandas.concat(frames)
print(len(td))
Run Code Online (Sandbox Code Playgroud)

CPU 时间:用户 5.65 秒,系统:1.47 秒,总计:7.12 秒挂墙时间:1 分 10 秒

%%time
# Dask Load
d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", 
                 parse_dates = ["created_at"], blocksize = 10000000).compute()
Run Code Online (Sandbox Code Playgroud)

CPU 时间:用户 2 分 59 秒,系统:26.2 秒,总计:3 分 25 秒挂墙时间:3 分 12 秒

我在许多不同的 Dask 比较中发现了这些类型的结果,但即使让它正常工作也可能为我指明正确的方向。

简而言之,对于这些类型的任务,我如何才能从 Dask 中获得最佳性能?为什么以其他方式完成的单线程和多线程技术似乎都表现不佳?

MRo*_*lin 4

我怀疑 Pandas read_csv 日期时间解析代码是纯 python 的,因此不会从使用线程中受益太多,而这是 dask.dataframe 默认使用的。

使用进程时您可能会看到更好的性能。

我怀疑以下方法会工作得更快:

import dask.multiprocessing
dask.set_options(get=dask.multiprocessing.get)  # set processes as default

d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", 
                parse_dates = ["created_at"], blocksize = 10000000)
len(d)
Run Code Online (Sandbox Code Playgroud)

进程的问题在于进程间通信可能会变得昂贵。我len(d)在上面进行了明确的计算,而不是d.compute()为了避免必须拾取工作进程中的所有 pandas 数据帧并将它们移动到主调用进程。在实践中,无论如何,这很常见,因为人们很少想要完整的数据帧,而是对数据帧进行一些计算。

这里的相关文档页面是http://dask.readthedocs.io/en/latest/scheduler-choice.html

您可能还想在单台计算机上使用分布式调度程序,而不是使用多处理调度程序。上面引用的文档中也对此进行了描述。

$ pip install dask distributed

from dask.distributed import Client
c = Client()  # create processes and set as default

d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", 
                parse_dates = ["created_at"], blocksize = 10000000)
len(d)
Run Code Online (Sandbox Code Playgroud)