Die*_*ães 2 spring asynchronous spring-integration spring-batch spring-boot
我正在将 Spring Batch 与 AsyncItemProcessor 一起使用,并且事情的表现出乎意料。让我先展示一下代码:
按照Spring Batch 项目中所示的简单示例进行操作:
@EnableBatchProcessing
@SpringBootApplication
@Import({HttpClientConfigurer.class, BatchJobConfigurer.class})
public class PerfilEletricoApp {
public static void main(String[] args) throws Exception {// NOSONAR
System.exit(SpringApplication.exit(SpringApplication.run(PerfilEletricoApp.class, args)));
//SpringApplication.run(PerfilEletricoApp.class, args);
}
}
Run Code Online (Sandbox Code Playgroud)
- 编辑
如果我只是让主进程休眠,请给 slf4j 几秒钟的时间来写入刷新日志,一切都会按预期进行。
@EnableBatchProcessing
@SpringBootApplication
@Import({HttpClientConfigurer.class, BatchJobConfigurer.class})
public class PerfilEletricoApp {
public static void main(String[] args) throws Exception {// NOSONAR
//System.exit(SpringApplication.exit(SpringApplication.run(PerfilEletricoApp.class, args)));
ConfigurableApplicationContext context = SpringApplication.run(PerfilEletricoApp.class, args);
Thread.sleep(1000 * 5);
System.exit(SpringApplication.exit(context));
}
Run Code Online (Sandbox Code Playgroud)
}
-- 结束编辑
我正在读取一个带有字段的文本文件,然后使用 AsyncItemProcessor 进行多线程处理,其中包括 URL 上的 Http GET 以获取一些数据,我还使用 NoOpWriter 在写入部分不执行任何操作。我将 GET 的结果保存在作业的处理器部分(使用 log.trace / log.warn)。
@Configuration
public class HttpClientConfigurer {
// [... property and configs omitted]
@Bean
public CloseableHttpClient createHttpClient() {
// ... creates and returns a poolable http client etc
}
}
Run Code Online (Sandbox Code Playgroud)
至于工作:
@Configuration
public class BatchJobConfigurer {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Value("${async.tps:10}")
private Integer tps;
@Value("${com.bemobi.perfilelerico.sourcedir:/AppServer/perfil-eletrico/source-dir/}")
private String sourceDir;
@Bean
public ItemReader<String> reader() {
MultiResourceItemReader<String> reader = new MultiResourceItemReader<>();
reader.setResources( new Resource[] { new FileSystemResource(sourceDir)});
reader.setDelegate((ResourceAwareItemReaderItemStream<? extends String>) flatItemReader());
return reader;
}
@Bean
public ItemReader<String> flatItemReader() {
FlatFileItemReader<String> itemReader = new FlatFileItemReader<>();
itemReader.setLineMapper(new DefaultLineMapper<String>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[] { "sample-field-001"});
}});
setFieldSetMapper(new SimpleStringFieldSetMapper<>());
}});
return itemReader;
}
@Bean
public ItemProcessor asyncItemProcessor(){
AsyncItemProcessor<String, OiPaggoResponse> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(processor());
asyncItemProcessor.setTaskExecutor(getAsyncExecutor());
return asyncItemProcessor;
}
@Bean
public ItemProcessor<String,OiPaggoResponse> processor(){
return new PerfilEletricoItemProcessor();
}
/**
* Using a NoOpItemWriter<T> so we satisfy spring batch flow but don't use writer for anything else.
* @return a NoOpItemWriter<OiPaggoResponse>
*/
@Bean
public ItemWriter<OiPaggoResponse> writer() {
return new NoOpItemWriter<>();
}
@Bean
protected Step step1() throws Exception {
/*
Problem starts here, If Use the processor() everything ends nicely, but if I insist on the asyncItemProcessor(), the job ends and the logs from processor are not stored on the disk.
*/
return this.steps.get("step1").<String, OiPaggoResponse> chunk(10)
.reader(reader())
.processor(asyncItemProcessor())
.build();
}
@Bean
public Job job() throws Exception {
return this.jobs.get("consulta-perfil-eletrico").start(step1()).build();
}
@Bean(name = "asyncExecutor")
public TaskExecutor getAsyncExecutor()
{
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(tps);
executor.setMaxPoolSize(tps);
executor.setQueueCapacity(tps * 1000);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("AsyncExecutor-");
return executor;
}
}
Run Code Online (Sandbox Code Playgroud)
-- 使用 AsyncItemWriter 更新(工作版本)
/*Wrapped Writer*/
@Bean
public ItemWriter asyncItemWriter(){
AsyncItemWriter<OiPaggoResponse> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(writer());
return asyncItemWriter;
}
/*AsyncItemWriter defined on the steps*/
@Bean
protected Step step1() throws Exception {
return this.steps.get("step1").<String, OiPaggoResponse> chunk(10)
.reader(reader())
.processor(asyncItemProcessor())
.writer(asyncItemWriter())
.build();
}
Run Code Online (Sandbox Code Playgroud)
-- 关于为什么 AsyncItemProcessor 在向上下文发送 OK-Completed 信号之前不等待所有孩子完成的任何想法?
问题是AsyncItemProcessor正在创建Future没有人等待的 s。将您包裹NoOpItemWriter在其中,AsyncItemWriter以便有人在等待Futures。这将导致作业按预期完成。
| 归档时间: |
|
| 查看次数: |
2715 次 |
| 最近记录: |