Roe*_*rel 4 java file-upload amazon-s3 aws-sdk
我想压缩使用 GZIP 流动态创建的数据并将其上传到 S3,同时我希望每个压缩文件的数据为 ±1Giga。
由于文件很大,而且我要并行处理多个文件,我无法将整个数据保存在内存中,我希望尽快将数据流式传输到 S3。
此外,我无法知道压缩数据的确切大小。阅读这个问题“我可以在没有内容长度标头的情况下将文件上传到 S3 吗? ”但我不知道如何将它与 GZIPing 结合起来。
我想我可以做到这一点,如果我能够创建GZIPOutputStream
,将数据逐部分发送给它,同时读取压缩数据的块(希望是 5Mb)并使用Amazon S3将它们上传到S3:分段上传
我正在尝试做的是可能的吗?或者我唯一的选择是将数据压缩到本地存储(我的硬盘)然后上传压缩文件?
我不接受答案,所以我是这样做的:
package roee.gavriel;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;
public class S3UploadStream extends OutputStream {
private final static Integer PART_SIZE = 5 * 1024 * 1024;
private final AmazonS3 s3client;
private final String bucket;
private final String key;
// The upload id given to the multiple parts upload by AWS.
private final String uploadId;
// A tag list. AWS give one for each part and expect then when the upload is finish.
private final List<PartETag> partETags = new LinkedList<>();
// A buffer to collect the data before sending it to AWS.
private byte[] partData = new byte[PART_SIZE];
// The index of the next free byte on the buffer.
private int partDataIndex = 0;
// Total number of parts that where uploaded.
private int totalPartCountIndex = 0;
private volatile Boolean closed = false;
// Internal thread pool which will handle the actual part uploading.
private final ThreadPoolExecutor executor;
public S3UploadStream(AmazonS3 s3client, String bucket, String key, int uploadThreadsCount) {
this.s3client = s3client;
this.bucket = bucket;
this.key = key;
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucket, key);
InitiateMultipartUploadResult initResponse = s3client.initiateMultipartUpload(initRequest);
this.uploadId = initResponse.getUploadId();
this.executor = new ThreadPoolExecutor(uploadThreadsCount, uploadThreadsCount, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(100));
}
@Override
public synchronized void write(int b) throws IOException {
if (closed) {
throw new IOException("Trying to write to a closed S3UploadStream");
}
partData[partDataIndex++] = (byte)b;
uploadPart(false);
}
@Override
public synchronized void close() {
if (closed) {
return;
}
closed = true;
// Flush the current data in the buffer
uploadPart(true);
executor.shutdown();
try {
executor.awaitTermination(2, TimeUnit.MINUTES);
} catch (InterruptedException e) {
//Nothing to do here...
}
// Close the multiple part upload
CompleteMultipartUploadRequest compRequest =
new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);
s3client.completeMultipartUpload(compRequest);
}
private synchronized void uploadPart(Boolean force) {
if (!force && partDataIndex < PART_SIZE) {
// the API requires that only the last part can be smaller than 5Mb
return;
}
// Actually start the upload
createUploadPartTask(partData, partDataIndex);
// We are going to upload the current part, so start buffering data to new part
partData = new byte[PART_SIZE];
partDataIndex = 0;
}
private synchronized void createUploadPartTask(byte[] partData, int partDataIndex) {
// Create an Input stream of the data
InputStream stream = new ByteArrayInputStream(partData, 0, partDataIndex);
// Build the upload request
UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(bucket)
.withKey(key)
.withUploadId(uploadId)
.withPartNumber(++totalPartCountIndex)
.withInputStream(stream)
.withPartSize(partDataIndex);
// Upload part and add response to our tag list.
// Make the actual upload in a different thread
executor.execute(() -> {
PartETag partETag = s3client.uploadPart(uploadRequest).getPartETag();
synchronized (partETags) {
partETags.add(partETag);
}
});
}
}
Run Code Online (Sandbox Code Playgroud)
这是一小段代码,使用它来将许多 guid 写入 S3 GZIP 文件:
int writeThreads = 3;
int genThreads = 10;
int guidPerThread = 200_000;
try (S3UploadStream uploadStream = new S3UploadStream(s3client, "<YourBucket>", "<YourKey>.gz", writeThreads)) {
try (GZIPOutputStream stream = new GZIPOutputStream(uploadStream)) {
Semaphore s = new Semaphore(0);
for (int t = 0; t < genThreads; ++t) {
new Thread(() -> {
for (int i = 0; i < guidPerThread; ++i) {
try {
stream.write(java.util.UUID.randomUUID().toString().getBytes());
stream.write('\n');
} catch (IOException e) {
}
}
s.release();
}).start();
}
s.acquire(genThreads);
}
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
4411 次 |
最近记录: |