如何在python中使用pyarrow从S3读取分区镶木地板文件

sto*_*eld 18 python parquet arrow-python fastparquet

我正在寻找使用python从s3读取多个分区目录数据的方法.

data_folder/serial_number = 1/cur_date = 20-12-2012/abcdsd0324324.snappy.parquet data_folder/serial_number = 2/cur_date = 27-12-2012/asdsdfsd0324324.snappy.parquet

pyarrow的ParquetDataset模块具有从分区读取的能力.所以我尝试了以下代码:

>>> import pandas as pd
>>> import pyarrow.parquet as pq
>>> import s3fs
>>> a = "s3://my_bucker/path/to/data_folder/"
>>> dataset = pq.ParquetDataset(a)
Run Code Online (Sandbox Code Playgroud)

它引发了以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
    self.metadata_path) = _make_manifest(path_or_paths, self.fs)
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 601, in _make_manifest
    .format(path))
OSError: Passed non-file path: s3://my_bucker/path/to/data_folder/
Run Code Online (Sandbox Code Playgroud)

根据pyarrow的文档,我尝试使用s3fs作为文件系统,即:

>>> dataset = pq.ParquetDataset(a,filesystem=s3fs)
Run Code Online (Sandbox Code Playgroud)

这会引发以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
    self.metadata_path) = _make_manifest(path_or_paths, self.fs)
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 583, in _make_manifest
    if is_string(path_or_paths) and fs.isdir(path_or_paths):
AttributeError: module 's3fs' has no attribute 'isdir'
Run Code Online (Sandbox Code Playgroud)

我只能使用ECS集群,因此不能选择spark/pyspark.

有没有办法我们可以轻松地从s3中的这些分区目录中的python中轻松读取镶木地板文件?我觉得列出所有目录,然后阅读这不是一个很好的做法,如此链接中所建议的那样.我需要将读取数据转换为pandas数据帧以进行进一步处理,因此更喜欢与fastparquet或pyarrow相关的选项.我也对python中的其他选项持开放态度.

Vin*_*aes 27

对于 python 3.6+,AWS 有一个名为 aws-data-wrangler 的库,它有助于 Pandas/S3/Parquet 之间的集成

安装做;

pip install awswrangler
Run Code Online (Sandbox Code Playgroud)

要使用 awswrangler1.x.x及更高版本从 s3 读取分区镶木地板,请执行;

import awswrangler as wr
df = wr.s3.read_parquet(path="s3://my_bucket/path/to/data_folder/", dataset=True)
Run Code Online (Sandbox Code Playgroud)

通过设置dataset=Trueawswrangler 期望分区的镶木地板文件。它将从您在path.


sto*_*eld 19

我设法使用最新版本的fastparquet和s3fs.以下是相同的代码:

import s3fs
import fastparquet as fp
s3 = s3fs.S3FileSystem()
fs = s3fs.core.S3FileSystem()

#mybucket/data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet 
s3_path = "mybucket/data_folder/*/*/*.parquet"
all_paths_from_s3 = fs.glob(path=s3_path)

myopen = s3.open
#use s3fs as the filesystem
fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
#convert to pandas dataframe
df = fp_obj.to_pandas()
Run Code Online (Sandbox Code Playgroud)

马丁通过我们的谈话指出我正确的方向

注意:根据基准测试,这比使用pyarrow要慢.一旦通过ARROW-1213在pyarrow中实现s3fs支持,我将更新我的答案

我对单个迭代进行了快速基准测试,并将pyarrow和文件列表作为glob发送到fastparquet.使用s3fs vs pyarrow +我的hackish代码,fastparquet更快.但我认为pyarrow + s3fs一旦实现就会更快.

代码和基准如下:

>>> def test_pq():
...     for current_file in list_parquet_files:
...         f = fs.open(current_file)
...         df = pq.read_table(f).to_pandas()
...         # following code is to extract the serial_number & cur_date values so that we can add them to the dataframe
...         #probably not the best way to split :)
...         elements_list=current_file.split('/')
...         for item in elements_list:
...             if item.find(date_partition) != -1:
...                 current_date = item.split('=')[1]
...             elif item.find(dma_partition) != -1:
...                 current_dma = item.split('=')[1]
...         df['serial_number'] = current_dma
...         df['cur_date'] = current_date
...         list_.append(df)
...     frame = pd.concat(list_)
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
12.078817503992468

>>> def test_fp():
...     fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
...     df = fp_obj.to_pandas()

>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.961556333000317
Run Code Online (Sandbox Code Playgroud)

更新2019年

在所有PR之后,Arrow-2038Fast Parquet - PR#182等问题已经解决.

使用Pyarrow读取镶木地板文件

# pip install pyarrow
# pip install s3fs

>>> import s3fs
>>> import pyarrow.parquet as pq
>>> fs = s3fs.S3FileSystem()

>>> bucket = 'your-bucket-name'
>>> path = 'directory_name' #if its a directory omit the traling /
>>> bucket_uri = f's3://{bucket}/{path}'
's3://your-bucket-name/directory_name'

>>> dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
>>> table = dataset.read()
>>> df = table.to_pandas() 
Run Code Online (Sandbox Code Playgroud)

使用Fast parquet阅读镶木地板文件

# pip install s3fs
# pip install fastparquet

>>> import s3fs
>>> import fastparquet as fp

>>> bucket = 'your-bucket-name'
>>> path = 'directory_name'
>>> root_dir_path = f'{bucket}/{path}'
# the first two wild card represents the 1st,2nd column partitions columns of your data & so forth
>>> s3_path = f"{root_dir_path}/*/*/*.parquet"
>>> all_paths_from_s3 = fs.glob(path=s3_path)

>>> fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
>>> df = fp_obj.to_pandas()
Run Code Online (Sandbox Code Playgroud)

快速基准

这可能不是对其进行基准测试的最佳方式.请阅读博客文章了解直通基准

#pyarrow
>>> import timeit
>>> def test_pq():
...     dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
...     table = dataset.read()
...     df = table.to_pandas()
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
1.2677053569998407

#fastparquet
>>> def test_fp():
...     fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
...     df = fp_obj.to_pandas()

>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.931876824000028
Run Code Online (Sandbox Code Playgroud)

关于Pyarrow的速度的进一步阅读

参考:


Sta*_*ger 10

对于那些你们谁想要为只读部分分区拼花文件,pyarrow接受键的列表,以及只是部分的目录路径在分区的所有部分阅读。这种方法对于将其镶木地板数据集按有意义的方式(例如按年份或国家/地区)进行分区的组织特别有用,允许用户指定他们需要文件的哪些部分。从长远来看,这将降低成本,因为 AWS 在读取数据集时按字节收费。

# Read in user specified partitions of a partitioned parquet file 

import s3fs
import pyarrow.parquet as pq
s3 = s3fs.S3FileSystem()

keys = ['keyname/blah_blah/part-00000-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\
         ,'keyname/blah_blah/part-00001-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\
         ,'keyname/blah_blah/part-00002-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\
         ,'keyname/blah_blah/part-00003-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet']

bucket = 'bucket_yada_yada_yada'

# Add s3 prefix and bucket name to all keys in list
parq_list=[]
for key in keys:
    parq_list.append('s3://'+bucket+'/'+key)

# Create your dataframe
df = pq.ParquetDataset(parq_list, filesystem=s3).read_pandas(columns=['Var1','Var2','Var3']).to_pandas()
Run Code Online (Sandbox Code Playgroud)


efb*_*own 5

此问题已在2017 年的拉取请求中得到解决。

对于那些想要仅使用 pyarrow 从 S3 读取 parquet 的人,这里有一个示例:

import s3fs
import pyarrow.parquet as pq

fs = s3fs.S3FileSystem()
bucket = "your-bucket"
path = "your-path"

# Python 3.6 or later
p_dataset = pq.ParquetDataset(
    f"s3://{bucket}/{path}",
    filesystem=fs
)
df = p_dataset.read().to_pandas()

# Pre-python 3.6
p_dataset = pq.ParquetDataset(
    "s3://{0}/{1}".format(bucket, path),
    filesystem=fs
)
df = p_dataset.read().to_pandas()
Run Code Online (Sandbox Code Playgroud)