在 AWS S3 中分块创建大型 zip 文件

Moj*_*imi 6 python amazon-s3 python-3.x boto3

所以,这个问题最终都是关于 python 和 S3 的。

假设我有一个包含这些文件的 S3 存储桶:

file1 --------- 2GB
file2 --------- 3GB
file3 --------- 1.9GB
file4 --------- 5GB
Run Code Online (Sandbox Code Playgroud)

这些文件是使用 S3 的预签名帖子 URL 上传的

我需要做的是让客户端能够将它们全部下载到 ZIP(或类似文件)中,但我无法在内存中或服务器存储中执行此操作,因为这是无服务器设置。

根据我的理解,理想情况下服务器需要:

  1. 在 S3 上启动 multipartupload 作业
  2. 可能需要向 multipart 作业发送一个块作为 zip 文件的标题;
  3. 以某种流的形式逐块下载存储桶中的每个文件,以免内存溢出
  4. 使用上面所说的流来创建一个 zip 块并在多部分作业中发送它
  5. 完成多部分作业和 zip 文件

现在,老实说,我不知道如何实现这一目标,甚至不知道是否有可能,但有些问题是:

  • 如何在 S3 中分块下载文件?最好使用 boto3 或 botocore
  • 如何在释放内存的同时分块创建​​ zip 文件?
  • 如何在 multipartupload 中连接这一切?

编辑:现在我想想,也许我什至不需要将 ZIP 文件放在 S3 中,我可以直接流式传输到客户端,对吗?那实际上会好得多

这是假设我在上面进行编辑的一些假设代码:

  #Let's assume Flask
  @app.route(/'download_bucket_as_zip'):
  def stream_file():
    def stream():
      #Probably needs to yield zip headers/metadata?
      for file in getFilesFromBucket():
         for chunk in file.readChunk(4000):
            zipchunk = bytesToZipChunk(chunk)
            yield zipchunk
    return Response(stream(), mimetype='application/zip')
Run Code Online (Sandbox Code Playgroud)

Lif*_*lex 3

你的问题非常复杂,因为解决它会让你陷入很多兔子洞。

我相信 Rahul Iyer 走在正确的轨道上,因为恕我直言,启动一个新的 EC2 实例并压缩该实例上的文件并将它们移回仅向客户端提供 zip 文件的 S3 存储桶会更容易。

如果您的文件较小,您可以在客户端请求文件时使用 AWS Cloudfront 来处理压缩。

在我的研究过程中,我确实注意到其他语言(例如 .Net 和 Java)具有处理流式传输到 zip 文件的 API。我还看了 zipstream,它已经分叉了好几次了。目前尚不清楚 zipstream 如何用于流式传输文件以进行压缩。

下面的代码将对文件进行分块并将块写入 zip 文件。输入文件接近 12Gbs,输出文件接近 5Gbs。

在测试过程中,我没有发现内存使用有任何重大问题或出现大峰值。

我确实在下面的一篇文章中添加了一些伪 S3 代码。我认为需要进行更多测试才能了解此代码如何在 S3 中的文件上工作。

from io import RawIOBase
from zipfile import ZipFile
from zipfile import ZipInfo
from zipfile import ZIP_DEFLATED

# This module is needed for ZIP_DEFLATED
import zlib


class UnseekableStream(RawIOBase):
def __init__(self):
    self._buffer = b''

def writable(self):
    return True

def write(self, b):
    if self.closed:
        raise ValueError('The stream was closed!')
    self._buffer += b
    return len(b)

def get(self):
    chunk = self._buffer
    self._buffer = b''
    return chunk


def zipfile_generator(path, stream):
   with ZipFile(stream, mode='w') as zip_archive:
       z_info = ZipInfo.from_file(path)
       z_info.compress_type = ZIP_DEFLATED
       with open(path, 'rb') as entry, zip_archive.open(z_info, mode='w') as dest: 
          for chunk in iter(lambda: entry.read(16384), b''): # 16384 is the maximum size of an SSL/TLS buffer.
             dest.write(chunk)
             yield stream.get()
 yield stream.get()


stream = UnseekableStream()
# each on the input files was 4gb
files = ['input.txt', 'input2.txt', 'input3.txt']
with open("test.zip", "wb") as f:
   for item in files:
      for i in zipfile_generator(item, stream):
         f.write(i)
         f.flush()
stream.close()
f.close()
Run Code Online (Sandbox Code Playgroud)

伪代码 s3/邮政编码

该代码是严格假设的,因为它需要测试。

from io import RawIOBase
from zipfile import ZipFile
from zipfile import ZipInfo
from zipfile import ZIP_DEFLATED
import os

import boto3

# This module is needed for ZIP_DEFLATED
import zlib

session = boto3.Session(
aws_access_key_id='XXXXXXXXXXXXXXXXXXXXXXX',
aws_secret_access_key='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',
region_name='XXXXXXXXXX')

s3 = session.resource('s3')
bucket_name = s3.Bucket('bucket name')

class UnseekableStream(RawIOBase):
   def __init__(self):
      self._buffer = b''

   def writable(self):
      return True

   def write(self, b):
      if self.closed:
        raise ValueError('The stream was closed!')
    self._buffer += b
    return len(b)

    def get(self):
      chunk = self._buffer
      self._buffer = b''
      return chunk


def zipfile_generator(path, stream):
   with ZipFile(stream, mode='w') as zip_archive:
       z_info = ZipInfo.from_file(path)
       z_info.compress_type = ZIP_DEFLATED
       with open(path, 'rb') as entry, zip_archive.open(z_info, mode='w') as dest:
           for chunk in iter(lambda: entry.read(16384), b''):
            dest.write(chunk)
              yield stream.get()
    yield stream.get()


stream = UnseekableStream()
with open("test.zip", "wb") as f:
   for file in bucket_name.objects.all():
     obj = s3.get_object(Bucket=bucket_name, Key=file.key)
     for i in zipfile_generator(obj.get(), stream):
        f.write(i)
        f.flush()
stream.close()
f.close()
Run Code Online (Sandbox Code Playgroud)