Kha*_*han 2 python amazon-s3 boto3 pyarrow
我创建了一个数据框并使用 pyarrow 将 df 转换为 parquet 文件(此处也提到):
def convert_df_to_parquet(self,df):
table = pa.Table.from_pandas(df)
buf = pa.BufferOutputStream()
pq.write_table(table, buf)
return buf
Run Code Online (Sandbox Code Playgroud)
现在我想将其上传到 s3 存储桶,并尝试了不同的输入参数,因为upload_file()我尝试过的所有操作都不起作用:
s3_client.upload_file(parquet_file, bucket_name, destination_key)#1st
s3_client.put_object(Bucket=bucket_name, Key=destination_key, Body=parquet_file)#2nd
s3_client.put_object(Bucket=bucket_name, Key=destination_key, Body=parquet_file.getvalue())#3rd
s3_client.put_object(Bucket=bucket_name, Key=destination_key, Body=parquet_file.read1())#4th
Run Code Online (Sandbox Code Playgroud)
错误:
s3_client.put_object(Bucket=bucket_name, Key=destination_key, Body=parquet_file.read1())
File "pyarrow/io.pxi", line 376, in pyarrow.lib.NativeFile.read1
File "pyarrow/io.pxi", line 310, in pyarrow.lib.NativeFile.read
File "pyarrow/io.pxi", line 320, in pyarrow.lib.NativeFile.read
File "pyarrow/io.pxi", line 155, in pyarrow.lib.NativeFile.get_input_stream
File "pyarrow/io.pxi", line 170, in pyarrow.lib.NativeFile._assert_readable
OSError: only valid on readonly files
Run Code Online (Sandbox Code Playgroud)
为什么不直接执行(Pandas -> S3)并加快速度?
import awswrangler as wr
wr.pandas.to_parquet(
dataframe=df,
path="s3://...",
dataset=True,
mode="overwrite", # Could be append, overwrite or overwrite_partitions
database="my_database", # Optional, only with you want it available on Athena/Glue Catalog
table="my_table",
partition_cols=["PARTITION_COL_NAME"])
Run Code Online (Sandbox Code Playgroud)
关于 pandas 更烦人的事情之一是,如果您的令牌在脚本期间过期,那么即使您正在pd.write_parquet("s3://...")使用. 为了解决这个问题,我自己写了:raise PermissionErrorboto3.Session()pd.to_parquet()
def to_parquet(df, s3_path):
""" Assumes path starts with s3:// """
parts = s3_path[5:].split("/")
bucket, key = (parts[0], "/".join(parts[1:]))
table = pa.Table.from_pandas(df)
writer = pa.BufferOutputStream()
pq.write_table(table, writer)
body = bytes(writer.getvalue())
session = boto3.Session(<private variables>)
s3 = session.client("s3")
s3.put_object(Body=body, Bucket=bucket, Key=key)
Run Code Online (Sandbox Code Playgroud)
祝你好运!
| 归档时间: |
|
| 查看次数: |
11528 次 |
| 最近记录: |