压缩数据并将其上传到 S3 而不将全部内容保留在内存中

Roe*_*rel 4 java file-upload amazon-s3 aws-sdk

我想压缩使用 GZIP 流动态创建的数据并将其上传到 S3,同时我希望每个压缩文件的数据为 ±1Giga。

由于文件很大,而且我要并行处理多个文件,我无法将整个数据保存在内存中,我希望尽快将数据流式传输到 S3。

此外,我无法知道压缩数据的确切大小。阅读这个问题“我可以在没有内容长度标头的情况下将文件上传到 S3 吗? ”但我不知道如何将它与 GZIPing 结合起来。

我想我可以做到这一点,如果我能够创建GZIPOutputStream,将数据逐部分发送给它,同时读取压缩数据的块(希望是 5Mb)并使用Amazon S3将它们上传到S3:分段上传

我正在尝试做的是可能的吗?或者我唯一的选择是将数据压缩到本地存储(我的硬盘)然后上传压缩文件?

Roe*_*rel 5

我不接受答案,所以我是这样做的:

package roee.gavriel;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;

public class S3UploadStream extends OutputStream {

    private final static Integer PART_SIZE = 5 * 1024 * 1024;

    private final AmazonS3 s3client;
    private final String bucket;
    private final String key;

    // The upload id given to the multiple parts upload by AWS.
    private final String uploadId;
    // A tag list. AWS give one for each part and expect then when the upload is finish.
    private final List<PartETag> partETags = new LinkedList<>();
    // A buffer to collect the data before sending it to AWS.
    private byte[] partData = new byte[PART_SIZE];
    // The index of the next free byte on the buffer.
    private int partDataIndex = 0;
    // Total number of parts that where uploaded.
    private int totalPartCountIndex = 0;
    private volatile Boolean closed = false;
    // Internal thread pool which will handle the actual part uploading.
    private final ThreadPoolExecutor executor;

    public S3UploadStream(AmazonS3 s3client, String bucket, String key, int uploadThreadsCount) {
        this.s3client = s3client;
        this.bucket = bucket;
        this.key = key;
        InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucket, key);
        InitiateMultipartUploadResult initResponse = s3client.initiateMultipartUpload(initRequest);
        this.uploadId = initResponse.getUploadId();
        this.executor = new ThreadPoolExecutor(uploadThreadsCount, uploadThreadsCount, 60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(100));
    }


    @Override
    public synchronized void write(int b) throws IOException {
        if (closed) {
            throw new IOException("Trying to write to a closed S3UploadStream");
        }
        partData[partDataIndex++] = (byte)b;
        uploadPart(false);
    }

    @Override
    public synchronized void close() {
        if (closed) {
            return;
        }
        closed = true;

        // Flush the current data in the buffer
        uploadPart(true);

        executor.shutdown();
        try {
            executor.awaitTermination(2, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            //Nothing to do here...
        }

        // Close the multiple part upload
        CompleteMultipartUploadRequest compRequest = 
                new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);

        s3client.completeMultipartUpload(compRequest);

    }

    private synchronized void uploadPart(Boolean force) {

        if (!force && partDataIndex < PART_SIZE) {
            // the API requires that only the last part can be smaller than 5Mb
            return;
        }

        // Actually start the upload
        createUploadPartTask(partData, partDataIndex);

        // We are going to upload the current part, so start buffering data to new part
        partData = new byte[PART_SIZE];
        partDataIndex = 0;          
    }

    private synchronized void createUploadPartTask(byte[] partData, int partDataIndex) {
        // Create an Input stream of the data
        InputStream stream = new ByteArrayInputStream(partData, 0, partDataIndex);

        // Build the upload request
        UploadPartRequest uploadRequest = new UploadPartRequest()
                .withBucketName(bucket)
                .withKey(key)
                .withUploadId(uploadId)
                .withPartNumber(++totalPartCountIndex)
                .withInputStream(stream)
                .withPartSize(partDataIndex);

        // Upload part and add response to our tag list.
        // Make the actual upload in a different thread
        executor.execute(() -> {
            PartETag partETag = s3client.uploadPart(uploadRequest).getPartETag();
            synchronized (partETags) {
                partETags.add(partETag);
            }
        });
    }   
}
Run Code Online (Sandbox Code Playgroud)

这是一小段代码,使用它来将许多 guid 写入 S3 GZIP 文件:

int writeThreads = 3;
int genThreads = 10;
int guidPerThread = 200_000;
try (S3UploadStream uploadStream = new S3UploadStream(s3client, "<YourBucket>", "<YourKey>.gz", writeThreads)) {
    try (GZIPOutputStream stream = new GZIPOutputStream(uploadStream)) {
        Semaphore s = new Semaphore(0);
        for (int t = 0; t < genThreads; ++t) {
            new Thread(() -> {
                for (int i = 0; i < guidPerThread; ++i) {
                    try {
                        stream.write(java.util.UUID.randomUUID().toString().getBytes());
                        stream.write('\n');
                    } catch (IOException e) {
                    }
                }
                s.release();
            }).start();
        }
        s.acquire(genThreads);
    }
}
Run Code Online (Sandbox Code Playgroud)