将 Pandas Dataframe 写入_csv StringIO 而不是文件

Sop*_*unk 5 dataframe python-3.x export-to-csv pandas boto3

客观此代码的是读取从指定S3桶现有CSV文件到一个数据帧,过滤数据帧为所需的列,然后编写过滤数据帧使用StringIO的一个CSV对象,我可以上传到不同S3桶。

除了函数“prepare_file_for_upload”的代码块之外,现在一切正常。下面是完整的代码块:

from io import StringIO
import io #unsued at the moment
import logging
import pandas as pd
import boto3
from botocore.exceptions import ClientError

FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)

#S3 parameters
source_bucket = 'REPLACE'
source_folder = 'REPLACE/'
dest_bucket = 'REPLACE'
dest_folder = 'REPLACE'
output_name = 'REPLACE'

def get_file_name():
try:
    s3 = boto3.client("s3")
    logging.info(f'Determining filename from: {source_bucket}/{source_folder}')
    bucket_path = s3.list_objects(Bucket=source_bucket, Prefix=source_folder)
    file_name =[key['Key'] for key in bucket_path['Contents']][1]
    logging.info(file_name)
    return file_name
except ClientError as e:
    logging.info(f'Unable to determine file name from bucket {source_bucket}/{source_folder}')
    logging.info(e)

def get_file_data(file_name):
try:
    s3 = boto3.client("s3")
    logging.info(f'file name from get data: {file_name}')
    obj = s3.get_object(Bucket=source_bucket, Key=file_name)
    body = obj['Body']
    body_string = body.read().decode('utf-8')
    file_data = pd.read_csv(StringIO(body_string))
    #logging.info(file_data)
    return file_data
except ClientError as e:
    logging.info(f'Unable to read {file_name} into datafame')
    logging.info(e)

def filter_file_data(file_data):
try:
    all_columns = list(file_data.columns)
    columns_used = ('col_1', 'col_2', 'col_3')
    desired_columns = [x for x in all_columns if x in columns_used]
    filtered_data = file_data[desired_columns]
    logging.info(type(filtered_data)) #for testing
    return filtered_data
except Exception as e:
    logging.info('Unable to filter file')
    logging.info(e)
Run Code Online (Sandbox Code Playgroud)

下面的块是我尝试使用带有 StringIO 的“to_csv”方法而不是创建本地文件来编写传递给函数的现有 DF。to_csv 将写入本地文件但不适用于缓冲区(是的,我尝试将缓冲区光标放在开始位置之后,仍然没有)

def prepare_file_for_upload(filtered_data): #this is the function block where I am stuck
try:
    buffer = StringIO()
    output_name = 'FILE_NAME.csv'
    #code below is writing to file but can not get to write to buffer
    output_file = filtered_data.to_csv(buffer, sep=',')
    df = pd.DataFrame(buffer) #for testing
    logging.info(df) #for testing
    return output_file
except Exception as e:
    logging.info(f'Unable to prepare {output_name} for upload')
    logging.info(e)

def upload_file(adjusted_file):
try:
    #dest_key = f'{dest_folder}/{output_name}'
    dest_key = f'{output_name}'
    s3 = boto3.resource('s3')
    s3.meta.client.upload_file(adjusted_file, dest_bucket, dest_key)
except ClientError as e:
    logging.info(f'Unable to upload {output_name} to {dest_key}')
    logging.info(e)

def execute_program():
file_name = get_file_name()
file_data = get_file_data(file_name)
filtered_data = filter_file_data(file_data)
adjusted_file = prepare_file_for_upload(filtered_data)
upload_file = upload_file(adjusted_file)

if __name__ == '__main__':
execute_program()
Run Code Online (Sandbox Code Playgroud)

Sop*_*unk 13

以下解决方案对我有用:

csv_buffer = StringIO()
output_file = filtered_data.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(dest_bucket, output_name).put(Body=csv_buffer.getvalue())
Run Code Online (Sandbox Code Playgroud)


Nat*_*han 5

使用 BytesIO 对象时,请特别注意操作的顺序。在代码中,您实例化 BytesIO 对象,然后通过调用来填充它to_csv()。到目前为止,一切都很好。但是,在使用与文件工作流程不同的 BytesIO 对象时需要管理的一件事是流位置。

将数据写入流后,流位置位于流的末尾。如果你尝试从那个位置写作,你可能什么也写不出来!该操作将完成,让您摸不着头脑为什么没有结果写入 S3。添加一个seek()带有参数的调用0到您的函数。这是一个演示程序,演示了:

from io import BytesIO
import boto3
import pandas
from pandas import util
df = util.testing.makeMixedDataFrame()
s3_resource = boto3.resource("s3")
buffer = BytesIO()
df.to_csv(buffer, sep=",", index=False, mode="wb", encoding="UTF-8")

# The following call to `tell()` returns the stream position. 0 is the beginning of the file.
df.tell()
>> 134

# Reposition stream to the beginning by calling `seek(0)` before uploading
df.seek(0)
s3_r.Object("test-bucket", "test_df_from_resource.csv").put(Body=buffer.getvalue())
Run Code Online (Sandbox Code Playgroud)

您应该得到类似于以下内容的响应(包含实际值)

>> {'ResponseMetadata': {'RequestId': 'request-id-value',
'HostId': '###########',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '############',
   'x-amz-request-id': '00000',
   'date': 'Tue, 31 Aug 2021 00:00:00 GMT',
   'x-amz-server-side-encryption': 'value',
   'etag': '"xxxx"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"xxxx"',
 'ServerSideEncryption': 'value'}
Run Code Online (Sandbox Code Playgroud)

更改代码以移动流位置应该可以解决您面临的问题。还值得一提的是,Pandas有一个错误,在写入字节对象时会导致意外行为。它已修复,我提供的示例假设您运行的 Python 版本高于 3.8,Pandas 版本高于 1.3.2。有关 IO 的更多信息可以在python 文档中找到。