正如我们在 Spring Batch 官方文档中看到的那样,“将数据传递到未来的步骤 - Spring Batch”是可能的,但我们大多数人一直在努力解决它,因为在官方文档中他们提到了两种可能性。一个台阶级别,一个工作级别。问题是如何检索步骤级别的数据?
我的解决方案与官方文档中的相关解决方案相同,但它不起作用。所以我决定做以下事情:
1-为促销监听器创建bean:
<beans:bean id="promotionListener"
class="org.springframework.batch.core.listener.ExecutionContextPromotionListener">
<beans:property name="keys" value="someValues"/>
</beans:bean>
Run Code Online (Sandbox Code Playgroud)
2-在您的步骤中设置监听器(您希望保存数据的步骤)
<listeners>
<listener ref="promotionListener"/>
</listeners>
Run Code Online (Sandbox Code Playgroud)
3- 在写入器中的同一步骤(将保存数据的步骤)中,保存数据。
private StepExecution stepExecution;
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
ExecutionContext executionContext = stepExecution.getExecutionContext();
Map<String, String> fieldsMap= new HashMap<>();
executionContext.put("keys", someValues);
}
@Override
public void write(List<? extends Map<String, String>> items) throws Exception {
LOGGER.info(items.toString());
Map<String, String> fieldsMap= new ConcurrentHashMap<>();
items.forEach(item -> item.forEach(fieldsMap::put));
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("keys", fieldsMap);
}
Run Code Online (Sandbox Code Playgroud)
您可以看到,在我的例子中,我将数据保存在 Map …
我一直在使用同步 ItemProcessor 和 Writer,但现在我将其移至异步,如下代码所示:
@Bean
public Job importFraudCodeJob(Step computeFormFileToDB) {
return jobBuilderFactory.get("Import-Entities-Risk-Codes")
.incrementer(new RunIdIncrementer())
.listener(notificationExecutionListener)
.start(computeFormFileToDB)
.build();
}
@Bean
public Step computeFormFileToDB(ItemReader<EntityRiskCodesDto> entityRiskCodeFileReader) {
return stepBuilderFactory.get("ImportFraudCodesStep")
.<EntityFraudCodesDto, Future<EntityFraudCodes>>chunk(chunkSize)
.reader(entityRiskCodeFileReader)
.processor(asyncProcessor())
.writer(asyncWriter())
.faultTolerant()
.skipPolicy(customSkipPolicy)
.listener(customStepListener)
.listener(chunkCounterListener())
.taskExecutor(taskExecutor())
.throttleLimit(6)
.build();
}
Run Code Online (Sandbox Code Playgroud)
在我的 ItemPocessor<I,O> 中,我使用 @BeforeStep 来获取我存储在 StepExecutionContext 中的值:
@BeforeStep
public void getKey(StepExecution stepExecution) {
log.info("Fetching batchNumber");
ExecutionContext context = stepExecution.getExecutionContext();
this.sequenceNumber = (Integer) context.get("sequenceNumber");
}
Run Code Online (Sandbox Code Playgroud)
这里是我的 AsyncProcessor 的声明:
@Bean
public AsyncItemProcessor<EntityRiskCodesDto, EntityRiskCodes> asyncProcessor() {
var asyncItemProcessor = new AsyncItemProcessor<EntityRiskCodesDto, EntityRiskCodes>();
asyncItemProcessor.setDelegate(riskCodeItemProcessor());
asyncItemProcessor.setTaskExecutor(taskExecutor()); …Run Code Online (Sandbox Code Playgroud) spring asynchronous batch-processing spring-batch spring-batch-tasklet