使用pyarrow从分区的镶木地板数据集中读取特定分区

suv*_*ayu 6 python parquet apache-arrow pyarrow

我有一个有点大(~20 GB)分区数据集的镶木地板格式.我想从数据集中读取特定分区pyarrow.我以为我可以用这个完成这个pyarrow.parquet.ParquetDataset,但事实似乎并非如此.这是一个小例子来说明我想要的东西.

要创建随机数据集:

from collections import OrderedDict
from itertools import product, chain
from uuid import uuid4
import os
from glob import glob

import numpy as np
import pandas as pd
import pyarrow as pa
from pyarrow.parquet import ParquetWriter, ParquetDataset


def get_partitions(basepath, partitions):
    """Generate directory hierarchy for a paritioned dataset

    data
    ??? part1=foo
    ?   ??? part2=True
    ??? part1=foo
    ?   ??? part2=False
    ??? part1=bar
    ?   ??? part2=True
    ??? part1=bar
        ??? part2=False

    """
    path_tmpl = '/'.join(['{}={}'] * len(partitions))  # part=value
    path_tmpl = '{}/{}'.format(basepath, path_tmpl)    # part1=val/part2=val

    parts = [product([part], vals) for part, vals in partitions.items()]
    parts = [i for i in product(*parts)]
    return [path_tmpl.format(*tuple(chain.from_iterable(i))) for i in parts]


partitions = OrderedDict(part1=['foo', 'bar'], part2=[True, False])
parts = get_partitions('data', partitions)
for part in parts:
    # 3 columns, 5 rows
    data = [pa.array(np.random.rand(5)) for i in range(3)]
    table = pa.Table.from_arrays(data, ['a', 'b', 'c'])
    os.makedirs(part, exist_ok=True)
    out = ParquetWriter('{}/{}.parquet'.format(part, uuid4()),
                        table.schema, flavor='spark')
    out.write_table(table)
    out.close()
Run Code Online (Sandbox Code Playgroud)

我想要读取分区1的所有值,并且只读取分区2的True.对于pandas.read_parquet,这是不可能的,我必须始终读取整个列.我尝试了以下内容pyarrow:

parts2 = OrderedDict(part1=['foo', 'bar'], part2=[True])
parts2 = get_partitions('data', parts2)
files = [glob('{}/*'.format(dirpath)) for dirpath in parts2]
files = [i for i in chain.from_iterable(files)]
df2 = ParquetDataset(files).read().to_pandas()
Run Code Online (Sandbox Code Playgroud)

这也不起作用:

>>> df2.columns
Index(['a', 'b', 'c'], dtype='object')
Run Code Online (Sandbox Code Playgroud)

我可以这样轻松地做到这pyspark一点:

def get_spark_session_ctx(appName):
    """Get or create a Spark Session, and the underlying Context."""
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName(appName).getOrCreate()
    sc = spark.sparkContext
    return (spark, sc)


spark, sc = get_spark_session_ctx('test')
spark_df = spark.read.option('basePath', 'data').parquet(*parts2)
df3 = spark_df.toPandas()
Run Code Online (Sandbox Code Playgroud)

如下所示:

>>> df3.columns
Index(['a', 'b', 'c', 'part1', 'part2'], dtype='object')
Run Code Online (Sandbox Code Playgroud)

这可以用pyarrow或者pandas,或者我需要一些自定义实现吗?

更新:根据Wes的要求,现在是JIRA.

ji.*_*.xu 10

截至pyarrow版本0.10.0,您可以使用filterskwarg进行查询.在你的情况下,它看起来像这样:

import pyarrow.parquet as pq
dataset = pq.ParquetDataset('path-to-your-dataset', filters=[('part2', '=', 'True'),])
table = dataset.read()
Run Code Online (Sandbox Code Playgroud)

参考


Wes*_*ney 5

问题:如何使用 pyarrow 从分区的镶木地板数据集中读取特定分区?

回答:你现在不能。

您可以在https://issues.apache.org/jira上创建一个请求此功能的 Apache Arrow JIRA吗?

这是我们应该能够在 pyarrow API 中支持的东西,但它需要有人来实现它。谢谢