使用boto3 lib和AWS Lambda从位于S3存储桶中的压缩文件中获取数据流

She*_*hek 7 zip amazon-s3 python-3.x boto3 aws-lambda

我正在尝试为我的chron作业创建一个无服务器处理器.在这个作业中,我从我的一个客户端收到了我的S3存储桶中的压缩文件,文件50MB大小不一样但是一旦你解压缩它就会变得1.5GB很大而且很难限制AWS Lambda上可用的空间,500MB因为我无法从S3存储桶下载此文件并将其解压缩到我的Lambda,我成功地解压缩了我的文件并使用funzipunix脚本逐行从S3流式传输内容.

for x in $files ; do echo -n "$x: " ; timeout 5 aws s3 cp $monkeydir/$x - | funzip
Run Code Online (Sandbox Code Playgroud)

我的名字:MonkeyBusiness 关键字:/Daily/Business/Banana/{current-date} 对象:banana.zip

但是现在因为我试图使用boto3实现相同的输出,我如何将压缩内容流式传输到sys i/o并解压缩流将内容保存在单独的文件中,每行除以10000行,然后将分块文件上传回S3.需要指导,因为我是AWS和boto3的新手.

如果您需要有关该工作的更多详细信息,请与我们联系.

下面给出的建议解决方案在这里不适用,因为zlib文档明确指出所述lib与gzip文件格式兼容,我的问题是zip文件格式.

import zlib

def stream_gzip_decompress(stream):
    dec = zlib.decompressobj(32 + zlib.MAX_WBITS)  # offset 32 to skip the header
    for chunk in stream:
        rv = dec.decompress(chunk)
        if rv:
            yield rv 
Run Code Online (Sandbox Code Playgroud)

She*_*hek 5

所以我使用 BytesIO 将压缩文件读入缓冲区对象,然后我使用 zipfile 将解压缩流作为未压缩数据打开,我能够逐行获取数据。

import io
import zipfile
import boto3
import sys

s3 = boto3.resource('s3', 'us-east-1')


def stream_zip_file():
    count = 0
    obj = s3.Object(
        bucket_name='MonkeyBusiness',
        key='/Daily/Business/Banana/{current-date}/banana.zip'
    )
    buffer = io.BytesIO(obj.get()["Body"].read())
    print (buffer)
    z = zipfile.ZipFile(buffer)
    foo2 = z.open(z.infolist()[0])
    print(sys.getsizeof(foo2))
    line_counter = 0
    for _ in foo2:
        line_counter += 1
    print (line_counter)
    z.close()


if __name__ == '__main__':
    stream_zip_file()
Run Code Online (Sandbox Code Playgroud)


moo*_*oot 1

这不是确切的答案。但你可以尝试一下这个。

首先,请修改提到的关于内存有限的 gzip 文件的答案,此方法允许人们逐块流式传输文件。boto3 S3 put_object() 和upload_fileobj似乎允许流式传输。

您需要将上述代码与以下解压进行混合和改编。

stream = cStringIO.StringIO()
stream.write(s3_data)
stream.seek(0)
blocksize = 1 << 16  #64kb
with gzip.GzipFile(fileobj=stream) as decompressor:
    while True:
        boto3.client.upload_fileobj(decompressor.read(blocksize), "bucket", "key")
Run Code Online (Sandbox Code Playgroud)

我不能保证上面的代码能够工作,它只是给你解压缩文件并按块重新上传的想法。您甚至可能需要将解压缩数据通过管道传输到 ByteIo,并通过管道传输到 upload_fileobj。有很多测试。

如果您不需要尽快解压缩文件,我的建议是使用 lambda 将文件放入 SQS 队列中。当有“足够”的文件时,触发一个 SPOT 实例(这将非常便宜),该实例将读取队列并处理文件。