将 csv 从 S3 流式传输/分块到 Python

Ajj*_*nan 9 python amazon-s3 botocore boto3 amazon-s3-select

我打算使用 Python 对存储在 S3 中的非常大的 csv 文件执行一些内存密集型操作,目的是将脚本移动到 AWS Lambda。我知道我可以读入整个 csv nto 内存,但我肯定会遇到 Lambda 的内存和存储限制,有这么大的文件,有什么方法可以使用 boto3 将 csv 的块流式传输或一次读取到 Python 中/ botocore,理想情况下通过指定要读入的行号?

以下是我已经尝试过的一些事情:

1) 使用range参数 inS3.get_object指定要读入的字节范围。不幸的是,这意味着最后几行在中间被截断,因为无法指定要读入的行数。有一些混乱的解决方法,例如扫描最后一个换行符,记录索引,然后将其用作下一个字节范围的起点,但如果可能的话,我想避免这种笨拙的解决方案。

2) 使用 S3 select 编写 sql 查询以有选择地从 S3 存储桶中检索数据。不幸的row_numbers是,不支持 SQL 函数,而且看起来没有办法读取行的子集。

Kir*_*rst 9

假设您的文件未压缩,这应该涉及从流中读取并在换行符上拆分。读取一个数据块,找到该块中换行符的最后一个实例,拆分并处理。

s3 = boto3.client('s3')
body = s3.get_object(Bucket=bucket, Key=key)['Body']

# number of bytes to read per chunk
chunk_size = 1000000

# the character that we'll split the data with (bytes, not string)
newline = '\n'.encode()   
partial_chunk = b''

while (True):
    chunk = partial_chunk + body.read(chunk_size)

    # If nothing was read there is nothing to process
    if chunk == b'':
        break

    last_newline = chunk.rfind(newline)

    # write to a smaller file, or work against some piece of data
    result = chunk[0:last_newline+1].decode('utf-8')

    # keep the partial line you've read here
    partial_chunk = chunk[last_newline+1:]
Run Code Online (Sandbox Code Playgroud)

如果你有 gzip 文件,那么你需要在循环中使用BytesIOGzipFile类;这是一个更难的问题,因为您需要保留 Gzip 压缩细节。