如何提高性能,遍历130多个项目并将其上传到AWS S3

fsa*_*ama 9 java performance amazon-s3 java-stream aws-lambda

我必须迭代130多个数据传输对象,并且每次都会生成一个json以上传到AWS S3。

如果不进行任何改进,则整个过程大约需要90秒。我尝试使用lamba而不是lamba,两者的结果相同。

for(AbstractDTO dto: dtos) {
    try {
        processDTO(dealerCode, yearPeriod, monthPeriod, dto);
    } catch (FileAlreadyExistsInS3Exception e) {
        failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
    }
}
Run Code Online (Sandbox Code Playgroud)
dtos.stream().forEach(dto -> {
    try {
        processDTO(dealerCode, yearPeriod, monthPeriod, dto);
    } catch (FileAlreadyExistsInS3Exception e) {
        failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
    }
});
Run Code Online (Sandbox Code Playgroud)

经过一番调查,我得出结论,processDTO方法运行每项大约需要0.650ms

我的第一个尝试是使用并行流,结果非常好,大约需要15秒才能完成整个过程:

dtos.parallelStream().forEach(dto -> {
    try {
        processDTO(dealerCode, yearPeriod, monthPeriod, dto);
    } catch (FileAlreadyExistsInS3Exception e) {
        failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
    }
});
Run Code Online (Sandbox Code Playgroud)

但是我仍然需要减少时间。我研究了如何改善并行流,并发现了ForkJoinPool技巧:

ForkJoinPool forkJoinPool = new ForkJoinPool(PARALLELISM_NUMBER);
forkJoinPool.submit(() ->
dtos.parallelStream().forEach(dto -> {
    try {
        processDTO(dealerCode, yearPeriod, monthPeriod, dto);
    } catch (FileAlreadyExistsInS3Exception e) {
        failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
    }
})).get();
forkJoinPool.shutdown();
Run Code Online (Sandbox Code Playgroud)

不幸的是,结果令我有些困惑。

  • 当PARALLELISM_NUMBER为8时,大约需要13秒才能完成整个过程。没有太大的改善。
  • 当PARALLELISM_NUMBER为16时,大约需要8秒才能完成整个过程。
  • 当PARALLELISM_NUMBER为32时,大约需要5秒钟才能完成整个过程。

所有测试都是使用邮递员的请求完成的,调用controller方法将最终迭代130个项目

我对5秒感到满意,使用32作为PARALLELISM_NUMBER,但我担心后果如何。

  • 保持32位还可以吗?
  • 理想的PARALLELISM_NUMBER是多少?
  • 决定其价值时,我必须牢记什么?

我在Mac 2.2GHZ I7上运行

sysctl hw.physicalcpu hw.logicalcp
hw.physicalcpu: 4
hw.logicalcpu: 8
Run Code Online (Sandbox Code Playgroud)

这是processDTO的作用:

private void processDTO(int dealerCode, int yearPeriod, int monthPeriod, AbstractDTO dto) throws FileAlreadyExistsInS3Exception {
    String flatJson = JsonFlattener.flatten(new JSONObject(dto).toString());
    String jsonFileName = dto.fileName() + JSON_TYPE;;
    String jsonFilePath = buildFilePathNew(dto.endpoint(), dealerCode, yearPeriod, monthPeriod, AWS_S3_JSON_ROOT_FOLDER);
    uploadFileToS3(jsonFilePath + jsonFileName, flatJson);
}
Run Code Online (Sandbox Code Playgroud)
public void uploadFileToS3(String fileName, String fileContent) throws FileAlreadyExistsInS3Exception {
    if (s3client.doesObjectExist(bucketName, fileName)) {
        throw new FileAlreadyExistsInS3Exception(ErrorMessages.FILE_ALREADY_EXISTS_IN_S3.getMessage());
    }
    s3client.putObject(bucketName, fileName, fileContent);
}
Run Code Online (Sandbox Code Playgroud)

fsa*_*ama 0

感谢你们所有有用的建议和解释,我成功地将时间减少到了8 秒。

由于瓶颈是上传到aws s3,并且您提到了aws的非阻塞API,经过一番研究,我发现类TransferManager包含非阻塞上传。

传输管理类

因此,我没有使用 ForkJoinPool 来增加线程数量,而是保留了简单的并行流:

        dtos.parallelStream().forEach(dto -> {
            try {
                processDTO(dealerCode, yearPeriod, monthPeriod, dto);
            } catch (FileAlreadyExistsInS3Exception e) {
                failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
            }
        });
Run Code Online (Sandbox Code Playgroud)

uploadToS3Method 发生了一些变化,我没有使用AmazonS3 ,而是使用了TransferManager

public Upload uploadAsyncFileToS3(String fileName, String fileContent) throws FileAlreadyExistsInS3Exception {
        if (s3client.doesObjectExist(bucketName, fileName)) {
            throw new FileAlreadyExistsInS3Exception(ErrorMessages.FILE_ALREADY_EXISTS_IN_S3.getMessage());
        }
        InputStream targetStream = new ByteArrayInputStream(fileContent.getBytes());
        ObjectMetadata metadata = new ObjectMetadata();
        metadata.setContentLength(fileContent.getBytes().length);
        return transferManager.upload(bucketName, fileName, targetStream, metadata);
}
Run Code Online (Sandbox Code Playgroud)

这样,当调用上传时,它不会等待它完成,而是让另一个 DTO 被处理。当所有 DTO 处理完毕后,我检查它们的上传状态以查看可能的错误(在第一个 forEach 之外)