带有 Spring Boot 的 Spring 批处理在子进程使用 AsyncItemProcessor 之前终止

Die*_*ães 2 spring asynchronous spring-integration spring-batch spring-boot

我正在将 Spring Batch 与 AsyncItemProcessor 一起使用,并且事情的表现出乎意料。让我先展示一下代码:

按照Spring Batch 项目中所示的简单示例进行操作:

@EnableBatchProcessing
@SpringBootApplication
@Import({HttpClientConfigurer.class, BatchJobConfigurer.class})
public class PerfilEletricoApp {
    public static void main(String[] args) throws Exception {// NOSONAR
        System.exit(SpringApplication.exit(SpringApplication.run(PerfilEletricoApp.class, args)));
        //SpringApplication.run(PerfilEletricoApp.class, args);
    }
}
Run Code Online (Sandbox Code Playgroud)

- 编辑

如果我只是让主进程休眠,请给 slf4j 几秒钟的时间来写入刷新日志,一切都会按预期进行。

@EnableBatchProcessing
@SpringBootApplication
@Import({HttpClientConfigurer.class, BatchJobConfigurer.class})
public class PerfilEletricoApp {

    public static void main(String[] args) throws Exception {// NOSONAR
        //System.exit(SpringApplication.exit(SpringApplication.run(PerfilEletricoApp.class, args)));
        ConfigurableApplicationContext context = SpringApplication.run(PerfilEletricoApp.class, args);

        Thread.sleep(1000 * 5);
        System.exit(SpringApplication.exit(context));
    }
Run Code Online (Sandbox Code Playgroud)

}

-- 结束编辑

我正在读取一个带有字段的文本文件,然后使用 AsyncItemProcessor 进行多线程处理,其中包括 URL 上的 Http GET 以获取一些数据,我还使用 NoOpWriter 在写入部分不执行任何操作。我将 GET 的结果保存在作业的处理器部分(使用 log.trace / log.warn)。

@Configuration
public class HttpClientConfigurer {
    // [... property and configs omitted] 
    @Bean
    public CloseableHttpClient createHttpClient() {
      // ... creates and returns a poolable http client etc
    }
}
Run Code Online (Sandbox Code Playgroud)

至于工作:

@Configuration
public class BatchJobConfigurer {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Value("${async.tps:10}")
    private Integer tps;

    @Value("${com.bemobi.perfilelerico.sourcedir:/AppServer/perfil-eletrico/source-dir/}")
    private String sourceDir;

    @Bean
    public ItemReader<String> reader() {
        MultiResourceItemReader<String> reader = new MultiResourceItemReader<>();
        reader.setResources( new Resource[] { new FileSystemResource(sourceDir)});
        reader.setDelegate((ResourceAwareItemReaderItemStream<? extends String>) flatItemReader());
        return reader;
    }

    @Bean
    public ItemReader<String> flatItemReader() {
        FlatFileItemReader<String> itemReader = new FlatFileItemReader<>();
        itemReader.setLineMapper(new DefaultLineMapper<String>() {{
            setLineTokenizer(new DelimitedLineTokenizer() {{
                setNames(new String[] { "sample-field-001"});
            }});
            setFieldSetMapper(new SimpleStringFieldSetMapper<>());
        }});
        return itemReader;
    }


    @Bean
    public ItemProcessor asyncItemProcessor(){
        AsyncItemProcessor<String, OiPaggoResponse> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(processor());
        asyncItemProcessor.setTaskExecutor(getAsyncExecutor());
        return asyncItemProcessor;
    }

    @Bean
    public ItemProcessor<String,OiPaggoResponse> processor(){
        return new PerfilEletricoItemProcessor();
    }

    /**
     * Using a NoOpItemWriter<T> so we satisfy spring batch flow but don't use writer for anything else.
     * @return a NoOpItemWriter<OiPaggoResponse>
     */
    @Bean
    public ItemWriter<OiPaggoResponse> writer() {
        return new NoOpItemWriter<>();
    }

    @Bean
    protected Step step1() throws Exception {
/*
 Problem starts here, If Use the processor() everything ends nicely, but if I insist on the asyncItemProcessor(), the job ends and the logs from processor are not stored on the disk.
*/
        return this.steps.get("step1").<String, OiPaggoResponse> chunk(10)
                .reader(reader())
                .processor(asyncItemProcessor())   
                .build();
    }

    @Bean
    public Job job() throws Exception {
        return this.jobs.get("consulta-perfil-eletrico").start(step1()).build();
    }

    @Bean(name = "asyncExecutor")
    public TaskExecutor getAsyncExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(tps);
        executor.setMaxPoolSize(tps);
        executor.setQueueCapacity(tps * 1000);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("AsyncExecutor-");
        return executor;
    }
}
Run Code Online (Sandbox Code Playgroud)

-- 使用 AsyncItemWriter 更新(工作版本)

   /*Wrapped Writer*/
   @Bean
    public ItemWriter asyncItemWriter(){
        AsyncItemWriter<OiPaggoResponse> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(writer());
        return asyncItemWriter;
    }

    /*AsyncItemWriter defined on the steps*/
    @Bean
    protected Step step1() throws Exception {
        return this.steps.get("step1").<String, OiPaggoResponse> chunk(10)
                .reader(reader())
                .processor(asyncItemProcessor())
                .writer(asyncItemWriter())
                .build();
    }
Run Code Online (Sandbox Code Playgroud)

-- 关于为什么 AsyncItemProcessor 在向上下文发送 OK-Completed 信号之前不等待所有孩子完成的任何想法?

Mic*_*lla 6

问题是AsyncItemProcessor正在创建Future没有人等待的 s。将您包裹NoOpItemWriter在其中,AsyncItemWriter以便有人在等待Futures。这将导致作业按预期完成。

  • 迈克尔,这看起来很简单,非常感谢。我已经创建了 NoOpWriter,并没有停下来想一想是在 AsyncItemProcessor 上创建的。感谢一百万。 (2认同)