如何将 pyarrow parquet 数据写入 s3 存储桶?

Kha*_*han 2 python amazon-s3 boto3 pyarrow

我创建了一个数据框并使用 pyarrow 将 df 转换为 parquet 文件(此处也提到):

def convert_df_to_parquet(self,df):
    table = pa.Table.from_pandas(df)
    buf = pa.BufferOutputStream()
    pq.write_table(table, buf)
    return buf
Run Code Online (Sandbox Code Playgroud)

现在我想将其上传到 s3 存储桶,并尝试了不同的输入参数,因为upload_file()我尝试过的所有操作都不起作用:

s3_client.upload_file(parquet_file, bucket_name, destination_key)#1st
s3_client.put_object(Bucket=bucket_name, Key=destination_key, Body=parquet_file)#2nd
s3_client.put_object(Bucket=bucket_name, Key=destination_key, Body=parquet_file.getvalue())#3rd
s3_client.put_object(Bucket=bucket_name, Key=destination_key, Body=parquet_file.read1())#4th
Run Code Online (Sandbox Code Playgroud)

错误:

 s3_client.put_object(Bucket=bucket_name, Key=destination_key, Body=parquet_file.read1())
  File "pyarrow/io.pxi", line 376, in pyarrow.lib.NativeFile.read1
  File "pyarrow/io.pxi", line 310, in pyarrow.lib.NativeFile.read
  File "pyarrow/io.pxi", line 320, in pyarrow.lib.NativeFile.read
  File "pyarrow/io.pxi", line 155, in pyarrow.lib.NativeFile.get_input_stream
  File "pyarrow/io.pxi", line 170, in pyarrow.lib.NativeFile._assert_readable
OSError: only valid on readonly files
Run Code Online (Sandbox Code Playgroud)

Igo*_*res 6

为什么不直接执行(Pandas -> S3)并加快速度?

import awswrangler as wr

wr.pandas.to_parquet(
    dataframe=df,
    path="s3://...",
    dataset=True,
    mode="overwrite",        # Could be append, overwrite or overwrite_partitions
    database="my_database",  # Optional, only with you want it available on Athena/Glue Catalog
    table="my_table",
    partition_cols=["PARTITION_COL_NAME"])
Run Code Online (Sandbox Code Playgroud)

参考


Ton*_*ony 6

关于 pandas 更烦人的事情之一是,如果您的令牌在脚本期间过期,那么即使您正在pd.write_parquet("s3://...")使用. 为了解决这个问题,我自己写了:raise PermissionErrorboto3.Session()pd.to_parquet()

def to_parquet(df, s3_path):
    """ Assumes path starts with s3:// """
    parts = s3_path[5:].split("/")
    bucket, key = (parts[0], "/".join(parts[1:]))

    table = pa.Table.from_pandas(df)
    writer = pa.BufferOutputStream()
    pq.write_table(table, writer)
    body = bytes(writer.getvalue())

    session = boto3.Session(<private variables>)
    s3 = session.client("s3")
    s3.put_object(Body=body, Bucket=bucket, Key=key)
Run Code Online (Sandbox Code Playgroud)

祝你好运!