Spring Batch 慢速读写

Son*_*wal 2 spring spring-batch spring-boot

我有一个批处理作业从 SQLServer 读取记录并写入 MariaDB。即使我在批处理中实现了分区的概念,该过程也很慢

下面是源和目标系统的数据源配置。

@Bean(name = "sourceSqlServerDataSource")
    public DataSource mysqlDataSource() {
        HikariDataSource hikariDataSource = new HikariDataSource();
        hikariDataSource.setMaximumPoolSize(100);
        hikariDataSource.setUsername(username);
        hikariDataSource.setPassword(password);
        hikariDataSource.setJdbcUrl(jdbcUrl);
        hikariDataSource.setDriverClassName(driverClassName);
        hikariDataSource.setPoolName("Source-SQL-Server");
        return hikariDataSource;
    } 

    @Bean(name = "targetMySqlDataSource")
    @Primary
    public DataSource mysqlDataSource() {
        HikariDataSource hikariDataSource = new HikariDataSource();
        hikariDataSource.setMaximumPoolSize(100);
        hikariDataSource.setUsername(username);
        hikariDataSource.setPassword(password);
        hikariDataSource.setJdbcUrl(jdbcUrl);
        hikariDataSource.setDriverClassName(driverClassName);
        hikariDataSource.setPoolName("Target-Myql-Server");
        return hikariDataSource;
    }
Run Code Online (Sandbox Code Playgroud)

下面是配置的 My Bean 和线程池 taskexecutor

@Bean(name = "myBatchJobsThreadPollTaskExecutor")
    public ThreadPoolTaskExecutor initializeThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(100);
        threadPoolTaskExecutor.setMaxPoolSize(200);
        threadPoolTaskExecutor.setThreadNamePrefix("My-Batch-Jobs-TaskExecutor ");
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
        threadPoolTaskExecutor.initialize();
        log.info("Thread Pool Initialized with min {} and Max {} Pool Size",threadPoolTaskExecutor.getCorePoolSize(),threadPoolTaskExecutor.getMaxPoolSize() );
        return threadPoolTaskExecutor;
    }
Run Code Online (Sandbox Code Playgroud)

这里是配置的步骤和分区步骤

@Bean(name = "myMainStep")
    public Step myMainStep() throws Exception{
        return stepBuilderFactory.get("myMainStep").chunk(500)
                .reader(myJdbcReader(null,null))
                .writer(myJpaWriter()).listener(chunkListener)
                .build();
    }

    @Bean
    public Step myPartitionStep() throws Exception {
        return stepBuilderFactory.get("myPartitionStep").listener(myStepListener)
                .partitioner(myMainStep()).partitioner("myPartition",myPartition)
                .gridSize(50).taskExecutor(asyncTaskExecutor).build();
    }
Run Code Online (Sandbox Code Playgroud)

使用读者和作者更新帖子

@Bean(name = "myJdbcReader")
    @StepScope
    public JdbcPagingItemReader myJdbcReader(@Value("#{stepExecutionContext[parameter1]}") Integer parameter1, @Value("#{stepExecutionContext[parameter2]}") Integer parameter2) throws Exception{
        JdbcPagingItemReader jdbcPagingItemReader = new JdbcPagingItemReader();
        jdbcPagingItemReader.setDataSource(myTargetDataSource);
        jdbcPagingItemReader.setPageSize(500);
        jdbcPagingItemReader.setRowMapper(myRowMapper());
        Map<String,Object> paramaterMap=new HashMap<>();
        paramaterMap.put("parameter1",parameter1);
        paramaterMap.put("parameter2",parameter2);
        jdbcPagingItemReader.setQueryProvider(myQueryProvider());
        jdbcPagingItemReader.setParameterValues(paramaterMap);
        return jdbcPagingItemReader;
    }

    @Bean(name = "myJpaWriter")
    public ItemWriter myJpaWriter(){
        JpaItemWriter<MyTargetTable> targetJpaWriter = new JpaItemWriter<>();
        targetJpaWriter.setEntityManagerFactory(localContainerEntityManagerFactoryBean.getObject());
        return targetJpaWriter;
    }
Run Code Online (Sandbox Code Playgroud)

有人可以了解如何使用 Spring 批处理提高读写性能吗?

Mah*_*ine 5

提高此类应用程序的性能取决于多个参数(网格大小、块大小、页面大小、线程池大小、数据库连接池大小、数据库服务器和 JVM 之间的延迟等)。所以我不能给你一个准确的答案,但我会尝试提供一些指导方针:

  • 在开始提高性能之前,您需要明确定义基线 + 目标。说“它很慢”是没有意义的。至少准备一个 JVM 分析器和带有查询执行计划分析器的 SQL 客户端。这些是在您的 JVM 或数据库上找到性能瓶颈所必需的。
  • 将网格大小设置为 50 并使用核心大小为 100 的线程池意味着将创建 50 个线程但不会使用。确保您使用的是线程池任务执行程序,.taskExecutor(asyncTaskExecutor)而不是SimpleAsyncTaskExecutor不重用线程的线程池任务执行程序。
  • 25 万条记录的 50 个分区对我来说似乎很多。每个分区将有 5000 条记录,每个分区将产生 10 个事务(因为 chunkSize = 500)。因此,两个数据库服务器和 JVM 之间将有 10 个事务 x 50 个分区 = 500 个事务。这可能是一个性能问题。我建议从较少的分区开始,例如 5 或 10。增加并发并不一定意味着提高性能。总有一个收支平衡点,您的应用程序将花费更多时间在上下文切换和处理并发上,而不是处理其业务逻辑。找到那个点是一个经验过程。
  • 我会首先在任何 Spring Batch 作业之外运行任何 sql 查询,以查看查询本身是否存在性能问题(查询获取太多列、太多记录等)或 db 模式(例如缺少索引)
  • 我不会将 JPA/Hibernate 用于这样的 ETL 工作。将数据映射到域对象可能很昂贵,尤其是在 O/R 映射未优化的情况下。在这些情况下,原始 JDBC 通常更快。

还有很多其他技巧,例如估计内存中的项目大小并确保内存中的总块大小小于堆大小以避免块内不必要的 GC,为批处理应用程序选择正确的 GC 算法等,但这些在某种程度上是先进的. 上面的指南列表是 IMO 的一个很好的起点。

希望这可以帮助!