使用 pandas 和 parquet 的效率

Xio*_*ion 3 pandas parquet dask pyarrow ibis

人们经常谈论使用镶木地板和熊猫。我正在努力了解与 pandas 一起使用时我们是否可以利用 parquet 文件的全部功能。例如,假设我有一个大 parquet 文件(按年份分区),有 30 列(包括年份、州、性别、姓氏)和许多行。我想加载镶木地板文件并执行随后的类似计算

import pandas as pd
df = pd.read_parquet("file.parquet")
df_2002 = df[df.year == 2002]
df_2002.groupby(["state", "gender"])["last_name"].count()
Run Code Online (Sandbox Code Playgroud)

在此查询中仅使用 4 列(共 30 列)并且仅2002使用年份分区。这意味着我们只想引入此计算所需的列和行,并且在具有谓词和投影下推的 parquet 中可以实现类似的操作(以及我们使用 parquet 的原因)。

但我试图了解这个查询在 pandas 中的行为方式。当我们打电话的那一刻,它会把所有的事情都记起来吗df = pd.read_parquet("file.parquet)?或者这里应用了任何惰性因素来引入投影和谓词下推?如果情况并非如此,那么将 pandas 与 parquet 一起使用还有什么意义呢?任何这一切都可以通过arrow package

虽然我没用过dask只是想知道这种情况是否是在 dask 中处理的,因为他们是懒惰地执行的。

我确信这种情况在 Spark 世界中处理得很好,但只是想知道在本地场景中如何使用 pandas、arrow、dask、ibis 等包处理这些情况。

Pac*_*ace 8

我正在努力了解与 pandas 一起使用时我们是否可以利用 parquet 文件的全部功能。

TL;DR:是的,但与使用 Dask 之类的工具相比,您可能需要更加努力。

例如,假设我有一个大镶木地板文件(按年份分区)

这是迂腐的,但单个 parquet 文件不会对任何内容进行分区。Parquet“数据集”(文件集合)已分区。例如:

my_dataset/year=2002/data.parquet
my_dataset/year=2003/data.parquet
Run Code Online (Sandbox Code Playgroud)

当我们调用 df = pd.read_parquet("file.parquet) 时,它是否会将所有内容都带入内存?

是的。但是......你可以做得更好:

df = pd.read_parquet('/tmp/new_dataset', filters=[[('year','=', 2002)]], columns=['year', 'state', 'gender', 'last_name'])
Run Code Online (Sandbox Code Playgroud)

关键字filters会将过滤器向下传递给 pyarrow,pyarrow 将以下推方式将过滤器应用到分区(例如,了解需要读取哪些目录)和行组统计信息。

columns关键字会将列选择向下传递给 pyarrow,它将应用该选择以仅从磁盘读取指定的列。

这一切都可以通过 arrow 包实现吗?

pandasread_parquet文件中的所有内容都由 pyarrow 在幕后处理(除非您更改为其他引擎)。传统上,group_by将由 pandas(好吧,也许是 numpy)直接处理,但如果您想尝试在 pyarrow 中执行所有操作,pyarrow 也有一些实验性计算 API。

尽管我没有使用过 dask,只是想知道这种情况是否在 dask 中处理,因为它们是惰性执行的。

根据我的理解(我对 dask 没有太多经验),当你说......

df_2002 = df[df.year == 2002]
df_2002.groupby(["state", "gender"])["last_name"].count()
Run Code Online (Sandbox Code Playgroud)

...在 dask 数据框中,dask 会发现它可以应用下推过滤器和谓词,并且在加载数据时会这样做。因此,dask 负责确定应该应用哪些过滤器以及需要加载哪些列。这使您不必提前自己弄清楚。

完整示例(您可以用来strace验证它是否仅加载两个镶木地板文件之一,并且仅加载该文件的一部分):

import pyarrow as pa
import pyarrow.dataset as ds
import pandas as pd

import shutil

shutil.rmtree('/tmp/new_dataset')
tab = pa.Table.from_pydict({
    "year": ["2002", "2002", "2002", "2002", "2002", "2002", "2003", "2003", "2003", "2003", "2003", "2003"],
    "state": [ "HI",   "HI",   "HI",   "HI",   "CO",   "CO",   "HI",   "HI",   "CO",   "CO",   "CO",   "CO"],
    "gender": [ "M",    "F",   None,    "F",    "M",    "F",   None,    "F",    "M",    "F",    "M",    "F"],
 "last_name": ["Smi", "Will", "Stev", "Stan",  "Smi", "Will", "Stev", "Stan",  "Smi", "Will", "Stev", "Stan"],
    "bonus": [    0,      1,      2,      3,      4,      5,      6,      7,      8,      9,     10,     11]
})
ds.write_dataset(tab, '/tmp/new_dataset', format='parquet', partitioning=['year'], partitioning_flavor='hive')

df = pd.read_parquet('/tmp/new_dataset', filters=[[('year','=', 2002)]], columns=['year', 'state', 'gender', 'last_name'])
df_2002 = df[df.year == 2002]
print(df.groupby(["state", "gender"])["last_name"].count())
Run Code Online (Sandbox Code Playgroud)

免责声明:您在这里询问了许多技术。我与 Apache Arrow 项目密切合作,因此我的回答可能在这个方向上有偏差。

  • 来自 Dask-land,我不认为 Dask 直接找出对“read_parquet”的适当调用。但它确实支持类似于 pandas 的列修剪和谓词下推过滤(使用 pyarrow 和 fasparquet 引擎),并具有惰性求值。请参阅此处的一些基准:https://coiled.io/blog/parquet-file-column-pruning-predicate-pushdown/ (3认同)