Spring Batch - 如何使用分区并行读写数据?

Luc*_*ano 2 spring multithreading spring-batch

我有一个 Spring Batch 应用程序 (3.0.7),它通过 Spring Boot 启动,它并行读取多个 XML 文件,处理它们,并针对 Oracle DB“吐出”INSERT 或 UPDATE 语句。

为了并行处理文件,我使用了Partitioner. 这项工作工作正常,除了JdbcWriter似乎只绑定到一个线程的 。由于我使用的是ThreadPoolTaskExecutor,我希望 Step 可以为读取器、处理器和写入器并行运行。但是,似乎 JdbcWriter 总是绑定到Thread-1(我可以在日志中看到,而且还分析了数据库连接,只有一个连接处于活动状态 - 请注意,我的数据源配置为使用具有 20 个连接的池)。

我已将读取器、处理器和写入器注释为 @StepScope。如何有效地使用所有配置的线程来并行taskExecutor读取和写入

这是我的配置的摘录:

@Bean
public Job parallelJob() throws Exception {
    return jobBuilderFactory.get("parallelJob")
            .start(splitFileStep())
            .next(recordPartitionStep())
            .build();
}

@Bean
public Step recordPartitionStep() {
    return stepBuilderFactory.get("factiva-recordPartitionStep")
            .partitioner(recordStep())
            .partitioner("recordStep", recordPartitioner(null)) <!-- this is used to inject some data from the job context
            .taskExecutor(taskExecutor())
            .build();
}

@Bean
public Step recordStep() {
    return stepBuilderFactory.get("recordStep")
            .<Object, StatementHolderMap>chunk(1000)
            .reader(recordReader(null)) <!-- this is used to inject some data from the job context
            .processor(recordProcessor) <!-- this is @Autowired, and the bean is marked as @StepScope
            .writer(jdbcItemWriter())
            .build();
}

@Bean
@StepScope
public ItemStreamReader recordReader(@Value("#{stepExecutionContext['record-file']}") Resource resource) {
    // THIS IS A StaxEventItemReader
}

@Bean
@StepScope
public JdbcItemWriter jdbcItemWriter() {

    JdbcItemWriter jdbcItemWriter = new JdbcItemWriter();
    jdbcItemWriter.setDataSource(dataSource);
    ...
    return jdbcItemWriter;
}

@Value("${etl.factiva.partition.cores}")
private int threadPoolSize;

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    if (threadPoolSize == 0) {
        threadPoolSize = Runtime.getRuntime().availableProcessors();
    }
    taskExecutor.setMaxPoolSize(threadPoolSize);
    taskExecutor.afterPropertiesSet();

    return taskExecutor;
}
Run Code Online (Sandbox Code Playgroud)

Luc*_*ano 5

我想出了为什么 Spring Batch 没有使用所有配置的线程。

首先,Partitioner 的 Spring 配置是错误的。原始配置未设置该gridSize值并且错误地引用了在分区中运行的步骤。

其次,ThreadPoolTaskExecutor原始配置中使用的似乎不起作用。切换到SimpleAsyncTaskExecutor做到了。

我仍然不确定为什么ThreadPoolTaskExecutor不起作用。javadoc forSimpleAsyncTaskExecutor实际上建议使用池来重用线程。

我也不是 100% 确定我完全理解设置gridSize值的含义。目前,我将 设置gridSize为一个值,该值等于分区步骤中使用的线程数。如果有人可以评论这种方法@Michael Minella,那就太好了?:)

这是正确的配置,供参考。

@Bean
public Job parallelJob() throws Exception {
    return jobBuilderFactory.get("parallelJob")
            .start(splitFileStep())
            .next(recordPartitionStep())
            .build();
}

@Bean
public Step recordPartitionStep() {
    return stepBuilderFactory.get("factiva-recordPartitionStep")
            .partitioner(recordStep().getName(), recordPartitioner(null)) <!-- the value for the recordPartitioner constructor is injected at runtime
            .step(recordStep())
            .gridSize(determineWorkerThreads()) <!-- GRID SIZE VALUE MUST BE EQUAL TO THE NUMBER OF THREAD CONFIGURED FOR THE THREAD POOL
            .taskExecutor(taskExecutor())
            .build();


}

@Bean
public Step recordStep() {
    return stepBuilderFactory.get("recordStep")
            .<Object, StatementHolderMap>chunk(1000)
            .reader(recordReader(null)) <!-- this is used to inject some data from the job context
            .processor(recordProcessor) <!-- this is @Autowired, and the bean is marked as @StepScope
            .writer(jdbcItemWriter())
            .build();
}

@Bean
@StepScope
public ItemStreamReader recordReader(@Value("#{stepExecutionContext['record-file']}") Resource resource) {
    // THIS IS A StaxEventItemReader
}

@Bean
@StepScope
public JdbcItemWriter jdbcItemWriter() {

    JdbcItemWriter jdbcItemWriter = new JdbcItemWriter();
    jdbcItemWriter.setDataSource(dataSource);
    ...
    return jdbcItemWriter;
}

@Value("${etl.factiva.partition.cores}")
private int threadPoolSize;

@Bean
public TaskExecutor taskExecutor() {
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("fac-thrd-");

    taskExecutor.setConcurrencyLimit(determineWorkerThreads());
    return taskExecutor;
}

// threadPoolSize is a configuration parameter for the job
private int determineWorkerThreads() {
    if (threadPoolSize == 0) {
        threadPoolSize = Runtime.getRuntime().availableProcessors();
    }
    return threadPoolSize;

}
Run Code Online (Sandbox Code Playgroud)