小编Edd*_*nne的帖子

将数据传递给未来的步骤 - Spring Batch

正如我们在 Spring Batch 官方文档中看到的那样,“将数据传递到未来的步骤 - Spring Batch”是可能的,但我们大多数人一直在努力解决它,因为在官方文档中他们提到了两种可能性。一个台阶级别,一个工作级别。问题是如何检索步骤级别的数据?

我的解决方案与官方文档中的相关解决方案相同,但它不起作用。所以我决定做以下事情:

1-为促销监听器创建bean:

<beans:bean id="promotionListener"
                class="org.springframework.batch.core.listener.ExecutionContextPromotionListener">
        <beans:property name="keys" value="someValues"/>
</beans:bean>
Run Code Online (Sandbox Code Playgroud)

2-在您的步骤中设置监听器(您希望保存数据的步骤)

        <listeners>
            <listener ref="promotionListener"/>
        </listeners>
Run Code Online (Sandbox Code Playgroud)

3- 在写入器中的同一步骤(将保存数据的步骤)中,保存数据。

private StepExecution stepExecution;

    @BeforeStep
    public void saveStepExecution(StepExecution stepExecution) {
        this.stepExecution = stepExecution;

        ExecutionContext executionContext = stepExecution.getExecutionContext();
        Map<String, String> fieldsMap= new HashMap<>();
        executionContext.put("keys", someValues);
    }

@Override
public void write(List<? extends Map<String, String>> items) throws Exception {
    LOGGER.info(items.toString());

    Map<String, String> fieldsMap= new ConcurrentHashMap<>();
    items.forEach(item -> item.forEach(fieldsMap::put));

    ExecutionContext stepContext = this.stepExecution.getExecutionContext();
    stepContext.put("keys", fieldsMap);
}
Run Code Online (Sandbox Code Playgroud)

您可以看到,在我的例子中,我将数据保存在 Map …

spring batch-processing spring-batch

7
推荐指数
1
解决办法
1万
查看次数

@BeforeStep 未在 AsyncProcessor 中调用

我一直在使用同步 ItemProcessor 和 Writer,但现在我将其移至异步,如下代码所示:

@Bean
public Job importFraudCodeJob(Step computeFormFileToDB) {
    return jobBuilderFactory.get("Import-Entities-Risk-Codes")
            .incrementer(new RunIdIncrementer())
            .listener(notificationExecutionListener)
            .start(computeFormFileToDB)
            .build();
}
@Bean
public Step computeFormFileToDB(ItemReader<EntityRiskCodesDto> entityRiskCodeFileReader) {
    return stepBuilderFactory.get("ImportFraudCodesStep")
            .<EntityFraudCodesDto, Future<EntityFraudCodes>>chunk(chunkSize)
            .reader(entityRiskCodeFileReader)
            .processor(asyncProcessor())
            .writer(asyncWriter())
            .faultTolerant()
            .skipPolicy(customSkipPolicy)
            .listener(customStepListener)
            .listener(chunkCounterListener())
            .taskExecutor(taskExecutor())
            .throttleLimit(6)
            .build();
}
Run Code Online (Sandbox Code Playgroud)

在我的 ItemPocessor<I,O> 中,我使用 @BeforeStep 来获取我存储在 StepExecutionContext 中的值:

@BeforeStep
public  void getKey(StepExecution stepExecution) {
    log.info("Fetching batchNumber");
    ExecutionContext context = stepExecution.getExecutionContext();
    this.sequenceNumber = (Integer) context.get("sequenceNumber");
}
Run Code Online (Sandbox Code Playgroud)

这里是我的 AsyncProcessor 的声明:

  @Bean
public AsyncItemProcessor<EntityRiskCodesDto, EntityRiskCodes> asyncProcessor() {
    var asyncItemProcessor = new AsyncItemProcessor<EntityRiskCodesDto, EntityRiskCodes>();
    asyncItemProcessor.setDelegate(riskCodeItemProcessor());
    asyncItemProcessor.setTaskExecutor(taskExecutor()); …
Run Code Online (Sandbox Code Playgroud)

spring asynchronous batch-processing spring-batch spring-batch-tasklet

3
推荐指数
1
解决办法
1224
查看次数