优化 Spring Boot 中的数据获取和插入

Ele*_*ian 4 database bulkinsert sql-insert spring-data-jpa spring-boot

我的 CSV 文件中有 270000 条记录,其中包含 user_id、book_ISBN 和 book_ rating 列,我需要将这些记录插入到多对多表中。我使用 openCSV 库解析数据,结果是一个列表。

public List<UserRatingDto> uploadRatings(MultipartFile file) throws IOException{
        BufferedReader fileReader = new BufferedReader(new
                InputStreamReader(file.getInputStream(), "UTF-8"));

        List<UserRatingDto> ratings = new CsvToBeanBuilder<UserRatingDto>(fileReader)
                .withType(UserRatingDto.class)
                .withSeparator(';')
                .withIgnoreEmptyLine(true)
                .withSkipLines(1)
                .build()
                .parse();
        return ratings;
    }
Run Code Online (Sandbox Code Playgroud)

这不存在性能问题,解析大约需要 1 分钟。但是,为了将它们插入到表中,我需要从数据库中获取书籍和用户以形成关系,我尝试使该方法与 @Async 注释异步,我尝试了并行流,我尝试将对象放入放入堆栈并使用 saveAll() 批量插入,但仍然花费太多时间。

 public void saveRatings(final MultipartFile file) throws IOException{
        List<UserRatingDto> userRatingDtos = uploadRatings(file);

        userRatingDtos.parallelStream().forEach(bookRating->{
            UserEntity user = userRepository.findByUserId(bookRating.getUserId());
            bookRepository.findByISBN(bookRating.getBookISBN()).ifPresent(book -> {
                BookRating bookRating1 = new BookRating();
                bookRating1.setRating(bookRating.getBookRating());
                bookRating1.setUser(user);
                bookRating1.setBook(book);
                book.getRatings().add(bookRating1);
                user.getRatings().add(bookRating1);
                bookRatingRepository.save(bookRating1);
            });

        });
}
Run Code Online (Sandbox Code Playgroud)

这就是我现在所拥有的,有什么我可以改变以使其更快吗?

Tom*_*des 6

问题是数据是被一一获取和保存的。访问数据的最有效方法通常是well defined batches,然后遵循以下模式:

  • 获取处理批次所需的数据
  • 处理内存中的批次
  • 在获取下一批之前保留处理结果

对于您的特定用例,您可以执行以下操作:

    public void saveRatings(final MultipartFile file) throws IOException {
        List<UserRatingDto> userRatingDtos = uploadRatings(file);

        // Split the list into batches
        getBatches(userRatingDtos, 100).forEach(this::processBatch);
    }

    private void processBatch(List<UserRatingDto> userRatingBatch) {
        
        // Retrieve all data required to process a batch
        Map<String, UserEntity> users = userRepository
                .findAllById(userRatingBatch.stream().map(UserRatingDto::getUserId).toList())
                .stream()
                .collect(toMap(UserEntity::getId, user -> user));
        Map<String, Book> books = bookRepository.findAllByIsbn(userRatingBatch.stream().map(UserRatingDto::getBookISBN).toList())
                .stream()
                .collect(toMap(Book::getIsbn, book -> book));

        // Process each rating in memory
        List<BookRating> ratingsToSave = userRatingBatch.stream().map(bookRatingDto -> {
            Book book = books.get(bookRatingDto.getBookISBN());
            if (book == null) {
                return null;
            }
            UserEntity user = users.get(bookRatingDto.getUserId());
            BookRating bookRating = new BookRating();
            bookRating.setRating(bookRatingDto.getBookRating());
            bookRating.setUser(user);
            bookRating.setBook(book);
            book.getRatings().add(bookRating);
            user.getRatings().add(bookRating);
            return bookRating;
        }).filter(Objects::nonNull).toList();

        // Save data in batches
        bookRatingRepository.saveAll(ratingsToSave);
        bookRepository.saveAll(books.values());
        userRepository.saveAll(users.values());

    }

    public <T> List<List<T>> getBatches(List<T> collection, int batchSize) {
        List<List<T>> batches = new ArrayList<>();
        for (int i = 0; i < collection.size(); i += batchSize) {
            batches.add(collection.subList(i, Math.min(i + batchSize, collection.size())));
        }
        return batches;
    }

Run Code Online (Sandbox Code Playgroud)

请注意,所有 I/O 应始终批量完成。如果您有单个数据库查找或保存在内部处理循环中,则这根本不起作用。

您可以尝试不同的方式batch sizes来看看什么会带来更好的性能 - 批次越大,事务保持打开状态的时间就越长,并且并不总是更大的批次会带来更好的整体性能。

另外,请确保妥善处理错误 - 例如:

  • 如果某个批次抛出错误,您可以将此类批次分成两部分,依此类推,直到只有一个评级失败。
  • 如果存在数据库访问问题,您还可以通过退避重试失败的批次。
  • 例如,如果您的必填字段为空,您可以放弃评级

编辑:根据 OP 的评论,这将性能提高了 10 倍以上。此外,如果顺序不重要,通过并行处理每个批次仍然可以大大提高性能。

编辑2:作为一般模式,理想情况下我们不会一开始就将所有记录都存储在内存中,而是检索要批量处理的数据。这将进一步提高性能并避免 OOM 错误。

此外,这可以在许多并发模式中完成,例如使用专用线程来获取数据,使用工作线程来处理数据,以及使用另一组线程来保存结果。

最简单的模式是让每个工作单元都是独立的——它们被赋予了应该处理的内容(例如,从数据库中获取的一组ID),然后检索处理所需的数据,在内存中处理它,并保存结果。