如何使用pyarrow从S3读取镶木地板文件列表作为pandas数据框?

Die*_*des 25 python dataframe pandas boto3 pyarrow

我有一种使用boto3(1.4.4),pyarrow(0.4.1)和pandas(0.20.3)实现这一目标的hacky方法.

首先,我可以在本地读取单个镶木地板文件,如下所示:

import pyarrow.parquet as pq

path = 'parquet/part-r-00000-1e638be4-e31f-498a-a359-47d017a0059c.gz.parquet'
table = pq.read_table(path)
df = table.to_pandas()
Run Code Online (Sandbox Code Playgroud)

我也可以在本地读取镶木地板文件目录,如下所示:

import pyarrow.parquet as pq

dataset = pq.ParquetDataset('parquet/')
table = dataset.read()
df = table.to_pandas()
Run Code Online (Sandbox Code Playgroud)

两者都像魅力一样.现在我想用存储在S3存储桶中的文件远程实现相同的功能.我希望这样的东西能起作用:

dataset = pq.ParquetDataset('s3n://dsn/to/my/bucket')
Run Code Online (Sandbox Code Playgroud)

但它没有:

OSError: Passed non-file path: s3n://dsn/to/my/bucket

在仔细阅读了pyarrow的文档后,目前似乎无法做到这一点.所以我提出了以下解决方案:

从S3读取单个文件并获取pandas数据帧:

import io
import boto3
import pyarrow.parquet as pq

buffer = io.BytesIO()
s3 = boto3.resource('s3')
s3_object = s3.Object('bucket-name', 'key/to/parquet/file.gz.parquet')
s3_object.download_fileobj(buffer)
table = pq.read_table(buffer)
df = table.to_pandas()
Run Code Online (Sandbox Code Playgroud)

在这里,我的hacky,not-so-optimized,解决方案从S3文件夹路径创建一个pandas数据框:

import io
import boto3
import pandas as pd
import pyarrow.parquet as pq

bucket_name = 'bucket-name'
def download_s3_parquet_file(s3, bucket, key):
    buffer = io.BytesIO()
    s3.Object(bucket, key).download_fileobj(buffer)
    return buffer

client = boto3.client('s3')
s3 = boto3.resource('s3')
objects_dict = client.list_objects_v2(Bucket=bucket_name, Prefix='my/folder/prefix')
s3_keys = [item['Key'] for item in objects_dict['Contents'] if item['Key'].endswith('.parquet')]
buffers = [download_s3_parquet_file(s3, bucket_name, key) for key in s3_keys]
dfs = [pq.read_table(buffer).to_pandas() for buffer in buffers]
df = pd.concat(dfs, ignore_index=True)
Run Code Online (Sandbox Code Playgroud)

有没有更好的方法来实现这一目标?也许某种使用pyarrow的熊猫连接器?我想避免使用pyspark,但如果没有其他解决方案,那么我会接受它.

vak*_*vak 27

您应该使用yjk21s3fs建议的模块.但是,如果调用ParquetDataset,您将获得一个pyarrow.parquet.ParquetDataset对象.要获得Pandas DataFrame,您宁愿申请它:.read_pandas().to_pandas()

import pyarrow.parquet as pq
import s3fs
s3 = s3fs.S3FileSystem()

pandas_dataframe = pq.ParquetDataset('s3://your-bucket/', filesystem=s3).read_pandas().to_pandas()
Run Code Online (Sandbox Code Playgroud)

  • 当你这样做时会发生什么?它是流式传输还是复制到本地?如果我打开 10GB 文件 2 次,它会下载 2 次吗?如果我仅使用 5GB 本地存储打开 10GB 文件,它是全部传输还是全部下载怎么办? (3认同)
  • 当我指定所有 Parquet 文件所在的键时,我收到“ArrowIOError:无效的 Parquet 文件大小为 0 字节”。当我明确指定镶木地板文件时,它就可以工作。我使用 s3fs == 0.3.5 和 pyarrow == 0.15.0。@vak 知道为什么我不能像你一样读取 s3 键中的所有镶木地板文件吗? (2认同)

Lou*_*ang 21

谢谢!你的问题实际上告诉了我很多。这就是我现在使用pandas(0.21.1) 执行的操作,它将调用pyarrowboto3(1.3.1)。

import boto3
import io
import pandas as pd

# Read single parquet file from S3
def pd_read_s3_parquet(key, bucket, s3_client=None, **args):
    if s3_client is None:
        s3_client = boto3.client('s3')
    obj = s3_client.get_object(Bucket=bucket, Key=key)
    return pd.read_parquet(io.BytesIO(obj['Body'].read()), **args)

# Read multiple parquets from a folder on S3 generated by spark
def pd_read_s3_multiple_parquets(filepath, bucket, s3=None, 
                                 s3_client=None, verbose=False, **args):
    if not filepath.endswith('/'):
        filepath = filepath + '/'  # Add '/' to the end
    if s3_client is None:
        s3_client = boto3.client('s3')
    if s3 is None:
        s3 = boto3.resource('s3')
    s3_keys = [item.key for item in s3.Bucket(bucket).objects.filter(Prefix=filepath)
               if item.key.endswith('.parquet')]
    if not s3_keys:
        print('No parquet found in', bucket, filepath)
    elif verbose:
        print('Load parquets:')
        for p in s3_keys: 
            print(p)
    dfs = [pd_read_s3_parquet(key, bucket=bucket, s3_client=s3_client, **args) 
           for key in s3_keys]
    return pd.concat(dfs, ignore_index=True)
Run Code Online (Sandbox Code Playgroud)

然后您可以通过以下方式从 S3 读取文件夹下的多个镶木地板

df = pd_read_s3_multiple_parquets('path/to/folder', 'my_bucket')
Run Code Online (Sandbox Code Playgroud)

(我想可以大大简化这段代码。)


Ric*_*ell 8

将云上的 parquet 数据读入数据帧的最简单方法可能是以这种方式使用dask.dataframe

import dask.dataframe as dd
df = dd.read_parquet('s3://bucket/path/to/data-*.parq')
Run Code Online (Sandbox Code Playgroud)

dask.dataframe可以从 Google Cloud Storage、Amazon S3、Hadoop 文件系统等读取


oya*_*163 7

也可以使用boto3来完成,而无需使用pyarrow

import boto3
import io
import pandas as pd

# Read the parquet file
buffer = io.BytesIO()
s3 = boto3.resource('s3')
object = s3.Object('bucket_name','key')
object.download_fileobj(buffer)
df = pd.read_parquet(buffer)

print(df.head())
Run Code Online (Sandbox Code Playgroud)

  • 我收到“AttributeError:'s3.Object'对象没有属性'download_fileobj'”。 (2认同)

ayo*_*rgo 6

如果您有正确的包设置

$ pip install pandas==1.1.0 pyarrow==1.0.0 s3fs==0.4.2
Run Code Online (Sandbox Code Playgroud)

以及适当配置 的 AWS 共享配置和凭证文件

您可以立即使用pandas

import pandas as pd

df = pd.read_parquet("s3://bucket/key.parquet")
Run Code Online (Sandbox Code Playgroud)

如果有多个 AWS配置文件,您可能还需要设置

$ export AWS_DEFAULT_PROFILE=profile_under_which_the_bucket_is_accessible
Run Code Online (Sandbox Code Playgroud)

这样您就可以访问您的存储桶。

  • 谢谢!这绝对应该是公认的答案。 (3认同)

小智 5

您可以使用dask的s3fs,它为s3实现了文件系统接口。然后,您可以像下面这样使用ParquetDataset的文件系统参数:

import s3fs
s3 = s3fs.S3FileSystem()
dataset = pq.ParquetDataset('s3n://dsn/to/my/bucket', filesystem=s3)
Run Code Online (Sandbox Code Playgroud)


Igo*_*res 5

如果您也愿意使用AWS Data Wrangler

import awswrangler as wr

df = wr.s3.read_parquet(path="s3://...")
Run Code Online (Sandbox Code Playgroud)

  • 您的示例将需要一个镶木地板文件。您需要设置参数“dataset = True”来读取镶木地板文件列表。 (3认同)