fsa*_*ama 9 java performance amazon-s3 java-stream aws-lambda
我必须迭代130多个数据传输对象,并且每次都会生成一个json以上传到AWS S3。
如果不进行任何改进,则整个过程大约需要90秒。我尝试使用lamba而不是lamba,两者的结果相同。
for(AbstractDTO dto: dtos) {
try {
processDTO(dealerCode, yearPeriod, monthPeriod, dto);
} catch (FileAlreadyExistsInS3Exception e) {
failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
}
}
Run Code Online (Sandbox Code Playgroud)
dtos.stream().forEach(dto -> {
try {
processDTO(dealerCode, yearPeriod, monthPeriod, dto);
} catch (FileAlreadyExistsInS3Exception e) {
failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
}
});
Run Code Online (Sandbox Code Playgroud)
经过一番调查,我得出结论,processDTO方法运行每项大约需要0.650ms。
我的第一个尝试是使用并行流,结果非常好,大约需要15秒才能完成整个过程:
dtos.parallelStream().forEach(dto -> {
try {
processDTO(dealerCode, yearPeriod, monthPeriod, dto);
} catch (FileAlreadyExistsInS3Exception e) {
failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
}
});
Run Code Online (Sandbox Code Playgroud)
但是我仍然需要减少时间。我研究了如何改善并行流,并发现了ForkJoinPool技巧:
ForkJoinPool forkJoinPool = new ForkJoinPool(PARALLELISM_NUMBER);
forkJoinPool.submit(() ->
dtos.parallelStream().forEach(dto -> {
try {
processDTO(dealerCode, yearPeriod, monthPeriod, dto);
} catch (FileAlreadyExistsInS3Exception e) {
failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
}
})).get();
forkJoinPool.shutdown();
Run Code Online (Sandbox Code Playgroud)
不幸的是,结果令我有些困惑。
所有测试都是使用邮递员的请求完成的,调用controller方法将最终迭代130个项目
我对5秒感到满意,使用32作为PARALLELISM_NUMBER,但我担心后果如何。
我在Mac 2.2GHZ I7上运行
sysctl hw.physicalcpu hw.logicalcp
hw.physicalcpu: 4
hw.logicalcpu: 8
Run Code Online (Sandbox Code Playgroud)
这是processDTO的作用:
private void processDTO(int dealerCode, int yearPeriod, int monthPeriod, AbstractDTO dto) throws FileAlreadyExistsInS3Exception {
String flatJson = JsonFlattener.flatten(new JSONObject(dto).toString());
String jsonFileName = dto.fileName() + JSON_TYPE;;
String jsonFilePath = buildFilePathNew(dto.endpoint(), dealerCode, yearPeriod, monthPeriod, AWS_S3_JSON_ROOT_FOLDER);
uploadFileToS3(jsonFilePath + jsonFileName, flatJson);
}
Run Code Online (Sandbox Code Playgroud)
public void uploadFileToS3(String fileName, String fileContent) throws FileAlreadyExistsInS3Exception {
if (s3client.doesObjectExist(bucketName, fileName)) {
throw new FileAlreadyExistsInS3Exception(ErrorMessages.FILE_ALREADY_EXISTS_IN_S3.getMessage());
}
s3client.putObject(bucketName, fileName, fileContent);
}
Run Code Online (Sandbox Code Playgroud)
感谢你们所有有用的建议和解释,我成功地将时间减少到了8 秒。
由于瓶颈是上传到aws s3,并且您提到了aws的非阻塞API,经过一番研究,我发现类TransferManager包含非阻塞上传。
因此,我没有使用 ForkJoinPool 来增加线程数量,而是保留了简单的并行流:
dtos.parallelStream().forEach(dto -> {
try {
processDTO(dealerCode, yearPeriod, monthPeriod, dto);
} catch (FileAlreadyExistsInS3Exception e) {
failedToUploadDTOs.add(e.getLocalizedMessage() + ": " + dto.fileName() + ".json");
}
});
Run Code Online (Sandbox Code Playgroud)
uploadToS3Method 发生了一些变化,我没有使用AmazonS3 ,而是使用了TransferManager:
public Upload uploadAsyncFileToS3(String fileName, String fileContent) throws FileAlreadyExistsInS3Exception {
if (s3client.doesObjectExist(bucketName, fileName)) {
throw new FileAlreadyExistsInS3Exception(ErrorMessages.FILE_ALREADY_EXISTS_IN_S3.getMessage());
}
InputStream targetStream = new ByteArrayInputStream(fileContent.getBytes());
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(fileContent.getBytes().length);
return transferManager.upload(bucketName, fileName, targetStream, metadata);
}
Run Code Online (Sandbox Code Playgroud)
这样,当调用上传时,它不会等待它完成,而是让另一个 DTO 被处理。当所有 DTO 处理完毕后,我检查它们的上传状态以查看可能的错误(在第一个 forEach 之外)
| 归档时间: |
|
| 查看次数: |
213 次 |
| 最近记录: |