使用谓词从 pyarrow.parquet.ParquetDataset 过滤行

klu*_*luu 10 python amazon-s3 pandas parquet pyarrow

我有一个存储在 s3 上的镶木地板数据集,我想从数据集中查询特定行。我能够使用petastorm它来做到这一点,但现在我只想使用pyarrow.

这是我的尝试:

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    'analytics.xxx', 
    filesystem=fs, 
    validate_schema=False, 
    filters=[('event_name', '=', 'SomeEvent')]
)

df = dataset.read_pandas().to_pandas()
Run Code Online (Sandbox Code Playgroud)

但这会返回一个pandas DataFrame,就好像过滤器不起作用一样,即我有具有不同值的行event_name。有什么我遗漏的或我误解的吗?我可以在获得 pandas DataFrame 后进行过滤,但我会使用比需要更多的内存空间。

rju*_*ney 14

注意:我在这篇文章中将扩展为 Python 和 Parquet 的综合指南

镶木地板格式分区

为了使用过滤器,您需要使用分区以 Parquet 格式存储数据。从多个 Parquet 列和分区中加载几个 Parquet 可以显着提高 Parquet 与 CSV 的 I/O 性能。Parquet 可以根据一个或多个字段的值对文件进行分区,并为嵌套值的唯一组合创建目录树,或者为一个分区列创建一组目录。该PySpark木地板文档解释了木地板是如何工作的相当好。

性别和国家的分区如下所示

path
??? to
    ??? table
        ??? gender=male
        ?   ??? ...
        ?   ?
        ?   ??? country=US
        ?   ?   ??? data.parquet
        ?   ??? country=CN
        ?   ?   ??? data.parquet
        ?   ??? ...
Run Code Online (Sandbox Code Playgroud)

如果您需要进一步对数据进行分区,还有行组分区,但大多数工具仅支持指定行组大小,您必须自己进行key-->row group查找,这很丑陋(很高兴在另一个问题中回答)。

用 Pandas 写分区

您需要使用 Parquet 对数据进行分区,然后您可以使用过滤器加载它。对于大型数据集,您可以使用 PyArrow、pandas 或DaskPySpark在分区中写入数据。

例如,要在 Pandas 中写入分区:

df.to_parquet(
    path='analytics.xxx', 
    engine='pyarrow',
    compression='snappy',
    columns=['col1', 'col5'],
    partition_cols=['event_name', 'event_category']
)
Run Code Online (Sandbox Code Playgroud)

这将文件布局如下:

path
??? to
    ??? table
        ??? gender=male
        ?   ??? ...
        ?   ?
        ?   ??? country=US
        ?   ?   ??? data.parquet
        ?   ??? country=CN
        ?   ?   ??? data.parquet
        ?   ??? ...
Run Code Online (Sandbox Code Playgroud)

在 PyArrow 中加载 Parquet 分区

要使用分区列通过一个属性获取事件,您可以在列表中放置一个元组过滤器:

df.to_parquet(
    path='analytics.xxx', 
    engine='pyarrow',
    compression='snappy',
    columns=['col1', 'col5'],
    partition_cols=['event_name', 'event_category']
)
Run Code Online (Sandbox Code Playgroud)

使用逻辑 AND 过滤

要使用 AND 获取具有两个或多个属性的事件,您只需创建一个过滤器元组列表:

analytics.xxx/event_name=SomeEvent/event_category=SomeCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=SomeEvent/event_category=OtherCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=OtherEvent/event_category=SomeCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=OtherEvent/event_category=OtherCategory/part-0001.c000.snappy.parquet
Run Code Online (Sandbox Code Playgroud)

使用逻辑 OR 进行过滤

要使用 OR 获取两个事件,您需要将过滤器元组嵌套在它们自己的列表中:

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    's3://analytics.xxx', 
    filesystem=fs, 
    validate_schema=False, 
    filters=[('event_name', '=', 'SomeEvent')]
)
df = dataset.to_table(
    columns=['col1', 'col5']
).to_pandas()
Run Code Online (Sandbox Code Playgroud)

使用 AWS Data Wrangler 加载 Parquet 分区

正如提到的另一个答案,将数据过滤加载到特定分区中的特定列的最简单方法是使用该awswrangler模块。如果您使用的是 S3,请查看awswrangler.s3.read_parquet()和的文档awswrangler.s3.to_parquet()。过滤的工作方式与上述示例相同。

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    's3://analytics.xxx', 
    filesystem=fs, 
    validate_schema=False, 
    filters=[
        ('event_name',     '=', 'SomeEvent'),
        ('event_category', '=', 'SomeCategory')
    ]
)
df = dataset.to_table(
    columns=['col1', 'col5']
).to_pandas()
Run Code Online (Sandbox Code Playgroud)

加载 Parquet 分区 pyarrow.parquet.read_table()

如果您使用的是 PyArrow,您还可以使用pyarrow.parquet.read_table()

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    's3://analytics.xxx', 
    filesystem=fs, 
    validate_schema=False, 
    filters=[
        [('event_name', '=', 'SomeEvent')],
        [('event_name', '=', 'OtherEvent')]
    ]
)
df = dataset.to_table(
    columns=['col1', 'col5']
).to_pandas()
Run Code Online (Sandbox Code Playgroud)

使用 PySpark 加载 Parquet 分区

最后,在 PySpark 中你可以使用 pyspark.sql.DataFrameReader.read_parquet()

import pyspark.sql.functions as F
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]") \
                    .appName('Stack Overflow Example Parquet Column Load') \
                    .getOrCreate()

# I automagically employ Parquet structure to load the selected columns and partitions
df = spark.read.parquet('s3://analytics.xxx') \
          .select('event_name', 'event_category') \
          .filter(F.col('event_name') == 'SomeEvent')
Run Code Online (Sandbox Code Playgroud)

希望这可以帮助您使用 Parquet :)


Nik*_*s B 12

对于从 Google 来到这里的任何人,您现在可以在读取 Parquet 文件时过滤 PyArrow 中的行。无论您是通过 pandas 还是 pyarrow.parquet 阅读它。

文档

过滤器(列表[元组]或列表[列表[元组]]或无(默认))——不匹配过滤谓词的行将从扫描数据中删除。如果文件不包含匹配的行,将利用嵌入在嵌套目录结构中的分区键来避免加载文件。如果 use_legacy_dataset 为 True,则过滤器只能引用分区键,并且仅支持 hive 样式的目录结构。将 use_legacy_dataset 设置为 False 时,还支持文件内级别过滤和不同的分区方案。

谓词以析取范式 (DNF) 表示,例如 [[('x', '=', 0), ...], ...]。DNF 允许单列谓词的任意布尔逻辑组合。最里面的元组每个描述一个单列谓词。内部谓词列表被解释为连词 (AND),形成更具选择性和多列的谓词。最后,最外面的列表将这些过滤器组合为析取 (OR)。

谓词也可以作为 List[Tuple] 传递。这种形式被解释为单个连词。要在谓词中表达 OR,必须使用(首选)List[List[Tuple]] 表示法。


jor*_*ris 5

目前,该filters功能仅在文件级别实现,尚未在行级别实现。

因此,如果您有一个数据集作为嵌套层次结构中多个分区 parquet 文件的集合(此处描述的分区数据集的类型:https ://arrow.apache.org/docs/python/parquet.html#partitioned-datasets- multiple-files),您可以使用该filters参数仅读取文件的子集。
但是,您还不能使用它来仅读取单个文件的行组的子集(请参阅https://issues.apache.org/jira/browse/ARROW-1796)。

但是,如果您能收到指定此类无效过滤器的错误消息,那就太好了。我为此提出了一个问题:https ://issues.apache.org/jira/browse/ARROW-5572


Vin*_*aes 5

对于 python 3.6+,AWS 有一个名为 aws-data-wrangler 的库,它有助于 Pandas/S3/Parquet 之间的集成,并且允许您过滤分区的 S3 键。

安装做;

pip install awswrangler
Run Code Online (Sandbox Code Playgroud)

为了减少读取的数据,您可以根据存储在 s3 上的 parquet 文件中的分区列来过滤行。event_name使用值do从分区列中过滤行"SomeEvent"

对于 awswrangler < 1.0.0

import awswrangler as wr

df = wr.pandas.read_parquet(
         path="s3://my-bucket/my/path/to/parquet-file.parquet",
         columns=["event_name"], 
         filters=[('event_name', '=', 'SomeEvent')]
)
Run Code Online (Sandbox Code Playgroud)

对于 awswrangler > 1.0.0 执行;

import awswrangler as wr

df = wr.s3.read_parquet(
         path="s3://my-bucket/my/path/to/parquet-file.parquet",
         columns=["event_name"], 
         filters=[('event_name', '=', 'SomeEvent')]
)
Run Code Online (Sandbox Code Playgroud)

  • 看起来这并不是一个真正的错误。对我的错误报告的回应是:“不幸的是,AWS Data Wrangler 不支持物理列上的过滤器,仅支持分区上的过滤器。(在上面的提交中更新了文档)。这不仅仅是传递 use_legacy_dataset=False 的问题,似乎新数据集方法不支持接收 boto3 会话。” 也许编辑这个答案以强调过滤仅适用于分区而不是物理列是个好主意? (2认同)