使用Java SDK将多个文件批处理到Amazon S3

use*_*793 5 java hadoop amazon-s3 amazon-web-services aws-java-sdk

我试图通过附加文件,在同一个密钥下将多个文件上传到Amazon S3.我有一个文件名列表,并希望按该顺序上传/追加文件.我几乎完全遵循本教程,但我首先循环遍历每个文件并部分上传.因为文件在hdfs上(Path实际上是org.apache.hadoop.fs.Path),所以我使用输入流来发送文件数据.下面是一些伪代码(我正在评论教程中逐字逐句的块):

// Create a list of UploadPartResponse objects. You get one of these for
// each part upload.
List<PartETag> partETags = new ArrayList<PartETag>();

// Step 1: Initialize.
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(
        bk.getBucket(), bk.getKey());
InitiateMultipartUploadResult initResponse =
        s3Client.initiateMultipartUpload(initRequest);
try {
      int i = 1; // part number
      for (String file : files) {
        Path filePath = new Path(file);

        // Get the input stream and content length
        long contentLength = fss.get(branch).getFileStatus(filePath).getLen();
        InputStream is = fss.get(branch).open(filePath);

        long filePosition = 0;
        while (filePosition < contentLength) {
            // create request
            //upload part and add response to our list
            i++;
        }
    }
    // Step 3: Complete.
    CompleteMultipartUploadRequest compRequest = new
          CompleteMultipartUploadRequest(bk.getBucket(),
          bk.getKey(),
          initResponse.getUploadId(),
          partETags);

    s3Client.completeMultipartUpload(compRequest);
} catch (Exception e) {
      //...
}
Run Code Online (Sandbox Code Playgroud)

但是,我收到以下错误:

com.amazonaws.services.s3.model.AmazonS3Exception: The XML you provided was not well-formed or did not validate against our published schema (Service: Amazon S3; Status Code: 400; Error Code: MalformedXML; Request ID: 2C1126E838F65BB9), S3 Extended Request ID: QmpybmrqepaNtTVxWRM1g2w/fYW+8DPrDwUEK1XeorNKtnUKbnJeVM6qmeNcrPwc
    at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1109)
    at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:741)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:461)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:296)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3743)
    at com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:2617)
Run Code Online (Sandbox Code Playgroud)

如果有人知道这个错误的原因可能是什么,那将非常感激.或者,如果有更好的方法将一堆文件连接成一个s3密钥,那也很好.我尝试使用java的内置SequenceInputStream,但是没有用.任何帮助将不胜感激.作为参考,所有文件的总大小可能大到10-15 GB.

lca*_*ito 0

我知道这可能有点晚了,但值得做出我的贡献。我已经成功使用 解决了类似的问题SequenceInputStream

技巧在于能够计算结果文件的总大小,然后SequenceInputStreamEnumeration<InputStream>.

以下是一些可能有帮助的示例代码:

public void combineFiles() {
    List<String> files = getFiles();
    long totalFileSize = files.stream()
                               .map(this::getContentLength)
                               .reduce(0L, (f, s) -> f + s);

    try {
        try (InputStream partialFile = new SequenceInputStream(getInputStreamEnumeration(files))) {
            ObjectMetadata resultFileMetadata = new ObjectMetadata();
            resultFileMetadata.setContentLength(totalFileSize);
            s3Client.putObject("bucketName", "resultFilePath", partialFile, resultFileMetadata);
        }
    } catch (IOException e) {
        LOG.error("An error occurred while combining files. {}", e);
    }
}

private Enumeration<? extends InputStream> getInputStreamEnumeration(List<String> files) {
    return new Enumeration<InputStream>() {
        private Iterator<String> fileNamesIterator = files.iterator();

        @Override
        public boolean hasMoreElements() {
            return fileNamesIterator.hasNext();
        }

        @Override
        public InputStream nextElement() {
            try {
                return new FileInputStream(Paths.get(fileNamesIterator.next()).toFile());
            } catch (FileNotFoundException e) {
                System.err.println(e.getMessage());
                throw new RuntimeException(e);
            }
        }

    };
}
Run Code Online (Sandbox Code Playgroud)

希望这可以帮助!