Ele*_*ian 4 database bulkinsert sql-insert spring-data-jpa spring-boot
我的 CSV 文件中有 270000 条记录,其中包含 user_id、book_ISBN 和 book_ rating 列,我需要将这些记录插入到多对多表中。我使用 openCSV 库解析数据,结果是一个列表。
public List<UserRatingDto> uploadRatings(MultipartFile file) throws IOException{
BufferedReader fileReader = new BufferedReader(new
InputStreamReader(file.getInputStream(), "UTF-8"));
List<UserRatingDto> ratings = new CsvToBeanBuilder<UserRatingDto>(fileReader)
.withType(UserRatingDto.class)
.withSeparator(';')
.withIgnoreEmptyLine(true)
.withSkipLines(1)
.build()
.parse();
return ratings;
}
Run Code Online (Sandbox Code Playgroud)
这不存在性能问题,解析大约需要 1 分钟。但是,为了将它们插入到表中,我需要从数据库中获取书籍和用户以形成关系,我尝试使该方法与 @Async 注释异步,我尝试了并行流,我尝试将对象放入放入堆栈并使用 saveAll() 批量插入,但仍然花费太多时间。
public void saveRatings(final MultipartFile file) throws IOException{
List<UserRatingDto> userRatingDtos = uploadRatings(file);
userRatingDtos.parallelStream().forEach(bookRating->{
UserEntity user = userRepository.findByUserId(bookRating.getUserId());
bookRepository.findByISBN(bookRating.getBookISBN()).ifPresent(book -> {
BookRating bookRating1 = new BookRating();
bookRating1.setRating(bookRating.getBookRating());
bookRating1.setUser(user);
bookRating1.setBook(book);
book.getRatings().add(bookRating1);
user.getRatings().add(bookRating1);
bookRatingRepository.save(bookRating1);
});
});
}
Run Code Online (Sandbox Code Playgroud)
这就是我现在所拥有的,有什么我可以改变以使其更快吗?
问题是数据是被一一获取和保存的。访问数据的最有效方法通常是well defined batches,然后遵循以下模式:
对于您的特定用例,您可以执行以下操作:
public void saveRatings(final MultipartFile file) throws IOException {
List<UserRatingDto> userRatingDtos = uploadRatings(file);
// Split the list into batches
getBatches(userRatingDtos, 100).forEach(this::processBatch);
}
private void processBatch(List<UserRatingDto> userRatingBatch) {
// Retrieve all data required to process a batch
Map<String, UserEntity> users = userRepository
.findAllById(userRatingBatch.stream().map(UserRatingDto::getUserId).toList())
.stream()
.collect(toMap(UserEntity::getId, user -> user));
Map<String, Book> books = bookRepository.findAllByIsbn(userRatingBatch.stream().map(UserRatingDto::getBookISBN).toList())
.stream()
.collect(toMap(Book::getIsbn, book -> book));
// Process each rating in memory
List<BookRating> ratingsToSave = userRatingBatch.stream().map(bookRatingDto -> {
Book book = books.get(bookRatingDto.getBookISBN());
if (book == null) {
return null;
}
UserEntity user = users.get(bookRatingDto.getUserId());
BookRating bookRating = new BookRating();
bookRating.setRating(bookRatingDto.getBookRating());
bookRating.setUser(user);
bookRating.setBook(book);
book.getRatings().add(bookRating);
user.getRatings().add(bookRating);
return bookRating;
}).filter(Objects::nonNull).toList();
// Save data in batches
bookRatingRepository.saveAll(ratingsToSave);
bookRepository.saveAll(books.values());
userRepository.saveAll(users.values());
}
public <T> List<List<T>> getBatches(List<T> collection, int batchSize) {
List<List<T>> batches = new ArrayList<>();
for (int i = 0; i < collection.size(); i += batchSize) {
batches.add(collection.subList(i, Math.min(i + batchSize, collection.size())));
}
return batches;
}
Run Code Online (Sandbox Code Playgroud)
请注意,所有 I/O 应始终批量完成。如果您有单个数据库查找或保存在内部处理循环中,则这根本不起作用。
您可以尝试不同的方式batch sizes来看看什么会带来更好的性能 - 批次越大,事务保持打开状态的时间就越长,并且并不总是更大的批次会带来更好的整体性能。
另外,请确保妥善处理错误 - 例如:
编辑:根据 OP 的评论,这将性能提高了 10 倍以上。此外,如果顺序不重要,通过并行处理每个批次仍然可以大大提高性能。
编辑2:作为一般模式,理想情况下我们不会一开始就将所有记录都存储在内存中,而是检索要批量处理的数据。这将进一步提高性能并避免 OOM 错误。
此外,这可以在许多并发模式中完成,例如使用专用线程来获取数据,使用工作线程来处理数据,以及使用另一组线程来保存结果。
最简单的模式是让每个工作单元都是独立的——它们被赋予了应该处理的内容(例如,从数据库中获取的一组ID),然后检索处理所需的数据,在内存中处理它,并保存结果。
| 归档时间: |
|
| 查看次数: |
4319 次 |
| 最近记录: |