Sha*_*esh 4 java amazon-s3 zipinputstream
需要使用流的分段上传方式将大文件上传到AWS S3,而不是使用lambda的/tmp。文件已上传但未完全上传。
在我的情况下,zip中每个文件的大小无法预测,可能一个文件的大小可达1 Gib。所以我使用ZipInputStream从S3读取,我想将其上传回S3。因为我正在研究lambda ,由于文件太大,我无法将文件保存在 lambda 的 /tmp 中。因此我尝试使用 S3 分段上传直接读取并上传到 S3,而不保存在 /tmp 中。但我遇到了一个问题,文件没有完全写入。我怀疑文件每次都被覆盖。请查看我的代码并提供帮助。
public void zipAndUpload {
byte[] buffer = new byte[1024];
try{
File folder = new File(outputFolder);
if(!folder.exists()){
folder.mkdir();
}
AmazonS3 s3Client = AmazonS3ClientBuilder.defaultClient();
S3Object object = s3Client.getObject("mybucket.s3.com","MyFilePath/MyZip.zip");
TransferManager tm = TransferManagerBuilder.standard()
.withS3Client(s3Client)
.build();
ZipInputStream zis =
new ZipInputStream(object.getObjectContent());
ZipEntry ze = zis.getNextEntry();
while(ze!=null){
String fileName = ze.getName();
System.out.println("ZE " + ze + " : " + fileName);
File newFile = new File(outputFolder + File.separator + fileName);
if (ze.isDirectory()) {
System.out.println("DIRECTORY" + newFile.mkdirs());
}
else {
filePaths.add(newFile);
int len;
while ((len = zis.read(buffer)) > 0) {
ObjectMetadata meta = new ObjectMetadata();
meta.setContentLength(len);
InputStream targetStream = new ByteArrayInputStream(buffer);
PutObjectRequest request = new PutObjectRequest("mybucket.s3.com", fileName, targetStream ,meta);
request.setGeneralProgressListener(new ProgressListener() {
public void progressChanged(ProgressEvent progressEvent) {
System.out.println("Transferred bytes: " + progressEvent.getBytesTransferred());
}
});
Upload upload = tm.upload(request);
}
}
ze = zis.getNextEntry();
}
zis.closeEntry();
zis.close();
System.out.println("Done");
}catch(IOException ex){
ex.printStackTrace();
}
}
Run Code Online (Sandbox Code Playgroud)
问题是你的内部 while 循环。基本上,您从 ZipInputStream 读取 1024 个字节并将其上传到 S3。您将一次又一次地覆盖目标密钥,而不是流式传输到 S3。
解决方案有点复杂,因为每个文件没有一个流,而是每个 zip 容器有一个流。这意味着您不能执行如下操作,因为首次上传完成后 AWS 将关闭该流
// Not possible
PutObjectRequest request = new PutObjectRequest(targetBucket, name,
zipInputStream, meta);
Run Code Online (Sandbox Code Playgroud)
您必须将 ZipInputStream 写入 PipedOutputStream 对象 - 对于每个 ZipEntry 位置。下面是一个工作示例
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import java.io.*;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
public class Pipes {
public static void main(String[] args) throws IOException {
Regions clientRegion = Regions.DEFAULT;
String sourceBucket = "<sourceBucket>";
String key = "<sourceArchive.zip>";
String targetBucket = "<targetBucket>";
PipedOutputStream out = null;
PipedInputStream in = null;
S3Object s3Object = null;
ZipInputStream zipInputStream = null;
try {
AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
.withRegion(clientRegion)
.withCredentials(new ProfileCredentialsProvider())
.build();
TransferManager transferManager = TransferManagerBuilder.standard()
.withS3Client(s3Client)
.build();
System.out.println("Downloading an object");
s3Object = s3Client.getObject(new GetObjectRequest(sourceBucket, key));
zipInputStream = new ZipInputStream(s3Object.getObjectContent());
ZipEntry zipEntry;
while (null != (zipEntry = zipInputStream.getNextEntry())) {
long size = zipEntry.getSize();
String name = zipEntry.getName();
if (zipEntry.isDirectory()) {
System.out.println("Skipping directory " + name);
continue;
}
System.out.printf("Processing ZipEntry %s : %d bytes\n", name, size);
// take the copy of the stream and re-write it to an InputStream
out = new PipedOutputStream();
in = new PipedInputStream(out);
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(size);
PutObjectRequest request = new PutObjectRequest(targetBucket, name, in, metadata);
transferManager.upload(request);
long actualSize = copy(zipInputStream, out, 1024);
if (actualSize != size) {
throw new RuntimeException("Filesize of ZipEntry " + name + " is wrong");
}
out.flush();
out.close();
}
} finally {
if (out != null) {
out.close();
}
if (in != null) {
in.close();
}
if (s3Object != null) {
s3Object.close();
}
if (zipInputStream != null) {
zipInputStream.close();
}
System.exit(0);
}
}
private static long copy(final InputStream input, final OutputStream output, final int buffersize) throws IOException {
if (buffersize < 1) {
throw new IllegalArgumentException("buffersize must be bigger than 0");
}
final byte[] buffer = new byte[buffersize];
int n = 0;
long count=0;
while (-1 != (n = input.read(buffer))) {
output.write(buffer, 0, n);
count += n;
}
return count;
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2331 次 |
| 最近记录: |