Mik*_*hea 5 python performance multithreading pandas dask
我一直在对一大堆文件进行大量文本处理,包括大型 CSV 文件和大量小型 XML 文件。有时我在做聚合计数,但很多时候我在做 NLP 类型的工作,以更深入地查看这些文件中的内容,而不是标记或已经结构化的内容。
我一直在使用多处理库在多个 CPU 上执行这些计算,但我已经爱上了 Dask 背后的想法,并且在网络上和同事都强烈推荐它。
我在这里问了一个关于 Dask 性能的类似问题:
和 MRocklin ( /sf/users/43163151/ ) 让我知道加载大量小文件可能会破坏性能。
然而,当我在单个大文件(200mb)上运行它时,我仍然没有让它表现得很好。下面是一个例子:
我有一个 900,000 行的推文 CSV 文件,我想快速加载它并解析“created_at”字段。以下是我完成的三种方法以及每种方法的基准。我在配备 16GB 内存的新 i7 2016 MacBook Pro 上运行此程序。
import pandas
import dask.dataframe as dd
import multiprocessing
%%time
# Single Threaded, no chunking
d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", parse_dates = ["created_at"])
print(len(d))
Run Code Online (Sandbox Code Playgroud)
CPU 时间:用户 2 分 31 秒,系统:807 毫秒,总计:2 分 32 秒挂墙时间:2 分 32 秒
%%time
# Multithreaded chunking
def parse_frame_dates(frame):
frame["created_at"] = pandas.to_datetime(frame["created_at"])
return(frame)
d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", chunksize = 100000)
frames = multiprocessing.Pool().imap_unordered(get_count, d)
td = pandas.concat(frames)
print(len(td))
Run Code Online (Sandbox Code Playgroud)
CPU 时间:用户 5.65 秒,系统:1.47 秒,总计:7.12 秒挂墙时间:1 分 10 秒
%%time
# Dask Load
d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv",
parse_dates = ["created_at"], blocksize = 10000000).compute()
Run Code Online (Sandbox Code Playgroud)
CPU 时间:用户 2 分 59 秒,系统:26.2 秒,总计:3 分 25 秒挂墙时间:3 分 12 秒
我在许多不同的 Dask 比较中发现了这些类型的结果,但即使让它正常工作也可能为我指明正确的方向。
简而言之,对于这些类型的任务,我如何才能从 Dask 中获得最佳性能?为什么以其他方式完成的单线程和多线程技术似乎都表现不佳?
我怀疑 Pandas read_csv 日期时间解析代码是纯 python 的,因此不会从使用线程中受益太多,而这是 dask.dataframe 默认使用的。
使用进程时您可能会看到更好的性能。
我怀疑以下方法会工作得更快:
import dask.multiprocessing
dask.set_options(get=dask.multiprocessing.get) # set processes as default
d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv",
parse_dates = ["created_at"], blocksize = 10000000)
len(d)
Run Code Online (Sandbox Code Playgroud)
进程的问题在于进程间通信可能会变得昂贵。我len(d)在上面进行了明确的计算,而不是d.compute()为了避免必须拾取工作进程中的所有 pandas 数据帧并将它们移动到主调用进程。在实践中,无论如何,这很常见,因为人们很少想要完整的数据帧,而是对数据帧进行一些计算。
这里的相关文档页面是http://dask.readthedocs.io/en/latest/scheduler-choice.html
您可能还想在单台计算机上使用分布式调度程序,而不是使用多处理调度程序。上面引用的文档中也对此进行了描述。
$ pip install dask distributed
from dask.distributed import Client
c = Client() # create processes and set as default
d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv",
parse_dates = ["created_at"], blocksize = 10000000)
len(d)
Run Code Online (Sandbox Code Playgroud)