有没有办法使用分段上传(Java高级API)将使用“java.util.zip”提取的zip文件上传到AWS-S3

Sha*_*esh 4 java amazon-s3 zipinputstream

需要使用流的分段上传方式将大文件上传到AWS S3,而不是使用lambda的/tmp。文件已上传但未完全上传。

在我的情况下,zip中每个文件的大小无法预测,可能一个文件的大小可达1 Gib。所以我使用ZipInputStream从S3读取,我想将其上传回S3。因为我正在研究lambda ,由于文件太大,我无法将文件保存在 lambda 的 /tmp 中。因此我尝试使用 S3 分段上传直接读取并上传到 S3,而不保存在 /tmp 中。但我遇到了一个问题,文件没有完全写入。我怀疑文件每次都被覆盖。请查看我的代码并提供帮助。

public void zipAndUpload {
    byte[] buffer = new byte[1024];
    try{
    File folder = new File(outputFolder);
    if(!folder.exists()){
        folder.mkdir();
    }

    AmazonS3 s3Client = AmazonS3ClientBuilder.defaultClient();  
    S3Object object = s3Client.getObject("mybucket.s3.com","MyFilePath/MyZip.zip");

    TransferManager tm = TransferManagerBuilder.standard()
            .withS3Client(s3Client)
            .build();

    ZipInputStream zis = 
        new ZipInputStream(object.getObjectContent());
    ZipEntry ze = zis.getNextEntry();

    while(ze!=null){    
    String fileName = ze.getName();
    System.out.println("ZE " + ze + " : " + fileName);

          File newFile = new File(outputFolder + File.separator + fileName);
          if (ze.isDirectory()) {
              System.out.println("DIRECTORY" + newFile.mkdirs());
          }
          else {
              filePaths.add(newFile);
              int len;
              while ((len = zis.read(buffer)) > 0) {

                  ObjectMetadata meta = new ObjectMetadata();
                  meta.setContentLength(len);
                  InputStream targetStream = new ByteArrayInputStream(buffer);

                  PutObjectRequest request = new PutObjectRequest("mybucket.s3.com", fileName, targetStream ,meta); 
                  request.setGeneralProgressListener(new ProgressListener() {
                      public void progressChanged(ProgressEvent progressEvent) {
                          System.out.println("Transferred bytes: " + progressEvent.getBytesTransferred());
                      }
                  });
                  Upload upload = tm.upload(request);
                 }
          }  
           ze = zis.getNextEntry();
    }

       zis.closeEntry();
       zis.close(); 
       System.out.println("Done");  
   }catch(IOException ex){
      ex.printStackTrace(); 
   }
    }
Run Code Online (Sandbox Code Playgroud)

Mic*_*eim 5

问题是你的内部 while 循环。基本上,您从 ZipInputStream 读取 1024 个字节并将其上传到 S3。您将一次又一次地覆盖目标密钥,而不是流式传输到 S3。

解决方案有点复杂,因为每个文件没有一个流,而是每个 zip 容器有一个流。这意味着您不能执行如下操作,因为首次上传完成后 AWS 将关闭该流

// Not possible
PutObjectRequest request = new PutObjectRequest(targetBucket, name, 
zipInputStream, meta);
Run Code Online (Sandbox Code Playgroud)

您必须将 ZipInputStream 写入 PipedOutputStream 对象 - 对于每个 ZipEntry 位置。下面是一个工作示例

import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;

import java.io.*;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

public class Pipes {
    public static void main(String[] args) throws IOException {

        Regions clientRegion = Regions.DEFAULT;
        String sourceBucket = "<sourceBucket>";
        String key = "<sourceArchive.zip>";
        String targetBucket = "<targetBucket>";

        PipedOutputStream out = null;
        PipedInputStream in = null;
        S3Object s3Object = null;
        ZipInputStream zipInputStream = null;

        try {
            AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
                    .withRegion(clientRegion)
                    .withCredentials(new ProfileCredentialsProvider())
                    .build();

            TransferManager transferManager = TransferManagerBuilder.standard()
                    .withS3Client(s3Client)
                    .build();

            System.out.println("Downloading an object");
            s3Object = s3Client.getObject(new GetObjectRequest(sourceBucket, key));
            zipInputStream = new ZipInputStream(s3Object.getObjectContent());

            ZipEntry zipEntry;
            while (null != (zipEntry = zipInputStream.getNextEntry())) {

                long size = zipEntry.getSize();
                String name = zipEntry.getName();
                if (zipEntry.isDirectory()) {
                    System.out.println("Skipping directory " + name);
                    continue;
                }

                System.out.printf("Processing ZipEntry %s : %d bytes\n", name, size);

                // take the copy of the stream and re-write it to an InputStream
                out = new PipedOutputStream();
                in = new PipedInputStream(out);

                ObjectMetadata metadata = new ObjectMetadata();
                metadata.setContentLength(size);

                PutObjectRequest request = new PutObjectRequest(targetBucket, name, in, metadata);

                transferManager.upload(request);

                long actualSize = copy(zipInputStream, out, 1024);
                if (actualSize != size) {
                    throw new RuntimeException("Filesize of ZipEntry " + name + " is wrong");
                }

                out.flush();
                out.close();
            }
        } finally {
            if (out != null) {
                out.close();
            }
            if (in != null) {
                in.close();
            }
            if (s3Object != null) {
                s3Object.close();
            }
            if (zipInputStream != null) {
                zipInputStream.close();
            }
            System.exit(0);
        }
    }

    private static long copy(final InputStream input, final OutputStream output, final int buffersize) throws IOException {
        if (buffersize < 1) {
            throw new IllegalArgumentException("buffersize must be bigger than 0");
        }
        final byte[] buffer = new byte[buffersize];
        int n = 0;
        long count=0;
        while (-1 != (n = input.read(buffer))) {
            output.write(buffer, 0, n);
            count += n;
        }
        return count;
    }
}
Run Code Online (Sandbox Code Playgroud)