如何重新启动失败的弹簧批处理作业并让它从中断处继续?

Tra*_*zed 9 spring spring-batch

根据Spring Batch文档,开箱即用的重新启动作业,但我无法从它离开的位置开始.例如,如果我的步骤处理了10个记录,它应该从记录11开始,每当我重新启动它时进行处理 在实践中,这不会发生.它从beginnen中读取并重新处理所有内容.

是否有人使用基于Java配置的简单作业配置来读取分隔文件并将内容写入可以从停止点重新启动的db表?

@Configuration
public class BatchConfiguration {

    @Value("${spring-batch.databaseType}")
    private String databaseType;

    @Value("${spring-batch.databaseSchema}")
    private String schemaName;

    @Bean
    public JobBuilderFactory jobBuilderFactory(final JobRepository jobRepository) {
        return new JobBuilderFactory(jobRepository);
    }

    @Bean
    public StepBuilderFactory stepBuilderFactory(final JobRepository jobRepository,
        final PlatformTransactionManager transactionManager) {
        return new StepBuilderFactory(jobRepository, transactionManager);
    }

    @Bean
    public JobRepository jobRepository(final DataSource dataSource, final PlatformTransactionManager transactionManager) {

        final JobRepositoryFactoryBean bean = new JobRepositoryFactoryBean();
        bean.setDatabaseType(databaseType);
        bean.setDataSource(dataSource);
        if (StringUtils.isNotBlank(schemaName)) {
            bean.setTablePrefix(schemaName);
        }
        bean.setTransactionManager(transactionManager);
        try {
            bean.afterPropertiesSet();
            return bean.getObject();
        } catch (final Exception e) {
            throw new BatchConfigurationException("Invalid batch job repository configuration.", e);
        }
    }

    @Bean
    public JobLauncher jobLauncher(final JobRepository jobRepository) {

        final SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        return jobLauncher;
    }

}

@Configuration
@EnableScheduling
@ComponentScan("com.some.package")
public class BatchJobConfiguration {

    @Resource
    private JobBuilderFactory jobBuilderFactory;

    @Resource
    private StepBuilderFactory stepBuilderFactory;

    @Value("${savings-transaction.file}")
    private String savingsTransactionFile;

    @Value("${savings-balance.file}")
    private String savingsBalanceFile;

    @Value("${processed-directory}")
    private String processedDirectory;

    private static final Integer IMPORT_CHUNKSIZE = 10;

    @Bean
    @DependsOn("stepBuilderFactory")
    public Step savingsTransactionStep(final PlatformTransactionManager transactionManager,
            @Qualifier("savingsTransactionItemReader") final ItemReader<SavingsTransactionItem> savingsTransactionItemReader,
            @Qualifier("savingsTransactionProcessor") final ItemProcessor<SavingsTransactionItem, SavingsTransaction> processor,
            @Qualifier("savingsTransactionItemWriter") final ItemWriter<SavingsTransaction> savingsTransactionItemWriter,
            @Qualifier("savingsTransactionStepListener") final SavingsTransactionStepListener listener) {

        return stepBuilderFactory.get("savingsTransactionStep")
                .transactionManager(transactionManager)
                .<SavingsTransactionItem, SavingsTransaction> chunk(IMPORT_CHUNKSIZE)
                .reader(savingsTransactionItemReader)
                .processor(processor)
                .writer(savingsTransactionItemWriter)
                .listener(listener)
                .build();
    }

    @Bean
    public Step savingsTransactionCleanUpStep(final PlatformTransactionManager transactionManager,
            final JobRepository jobRepository) {

        final TaskletStep taskletStep = new TaskletStep("savingsTransactionCleanUpStep");

        final FileMovingTasklet tasklet = new FileMovingTasklet();
        tasklet.setFileNamePattern(savingsTransactionFile);
        tasklet.setProcessedDirectory(processedDirectory);
        taskletStep.setTasklet(tasklet);
        taskletStep.setTransactionManager(transactionManager);
        taskletStep.setJobRepository(jobRepository);
        try {
            taskletStep.afterPropertiesSet();
        } catch (final Exception e) {
            throw new BatchConfigurationException("Failed to configure tasklet!", e);
        }

        return taskletStep;
    }

    @Bean
    @DependsOn("jobBuilderFactory")
    public Job job(final Step savingsTransactionStep,
            final Step savingsTransactionCleanUpStep) {

        return jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())
                .start(savingsTransactionStep)  
                .next(savingsTransactionCleanUpStep)                    
                .on("FINISHED")
                .end()
                .build()
                .build();
    }
}
Run Code Online (Sandbox Code Playgroud)

重新启动作业的单元测试代码

    final Date now = new Date();
    jobMananger.processRegistrations(now);

    final List<SavingsBalance> savingsBalances = savingsBalanceDao.findAll();
    assertEquals(9, savingsBalances.size());

    FileUtils.moveFile(new File("target/AEA001_20160610.dat"), new File("target/AEA001_20160610_invalid.dat"));
    FileUtils.moveFile(new File("target/AEA001_20160610_valid.dat"), new File("target/AEA001_20160610.dat"));

    jobMananger.processRegistrations(now);

    final List<SavingsBalance> savingsBalances2 = savingsBalanceDao.findAll();
    System.out.println(savingsBalances2.size());
    int found = 0;
    for (final SavingsBalance savingsBalance : savingsBalances2) {

        final String id = savingsBalance.getId();
        if ("12345".equals(id)) {
            found++;
        }

    }

    assertEquals("Invalid number of found balances!", 1, found);
Run Code Online (Sandbox Code Playgroud)

工作经理的实施

public class JobManager {

    @Resource
    private JobLauncher jobLauncher;

    @Resource
    private Job job;

    @Transactional(propagation = Propagation.NOT_SUPPORTED)
    public void processRegistrations(final Date date) {

        try {

            final Map<String, JobParameter> parameters = new HashMap<>();
            parameters.put("START_DATE", new JobParameter(date));

            final JobParameters jobParameters = new JobParameters(parameters);
            final JobExecution execution = jobLauncher.run(job, jobParameters);
            LOG.info("Exit Status : " + execution.getStatus());
        } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
                | JobParametersInvalidException e) {
            LOG.error("Failed to process registrations.", e);
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

Tra*_*zed 5

您似乎必须配置以下bean才能重新启动作业.

@Bean
public JobOperator jobOperator(final JobLauncher jobLauncher, final JobRepository jobRepository,
        final JobRegistry jobRegistry, final JobExplorer jobExplorer) {
    final SimpleJobOperator jobOperator = new SimpleJobOperator();
    jobOperator.setJobLauncher(jobLauncher);
    jobOperator.setJobRepository(jobRepository);
    jobOperator.setJobRegistry(jobRegistry);
    jobOperator.setJobExplorer(jobExplorer);
    return jobOperator;
}

@Bean
public JobExplorer jobExplorer(final DataSource dataSource) throws Exception {
    final JobExplorerFactoryBean bean = new JobExplorerFactoryBean();
    bean.setDataSource(dataSource);
    bean.setTablePrefix("BATCH_");
    bean.setJdbcOperations(new JdbcTemplate(dataSource));
    bean.afterPropertiesSet();
    return bean.getObject();
}
Run Code Online (Sandbox Code Playgroud)

然后,您需要从批处理表中检索批处理实例ID,以便能够使用jobOperator重新启动该特定实例.

final Long restartId = jobOperator.restart(id);
final JobExecution restartExecution = jobExplorer.getJobExecution(restartId);
Run Code Online (Sandbox Code Playgroud)

  • 我有一个非常基本的问题。在哪里编写上面两行代码。我有“ScheduledTaskRegistrar”,其中,我正在调用方法“scheduleJobA()”,并且该方法启动作业。并且`beforeJob()`方法在jobLauncher启动作业后被调用。所以,我没有得到失败的作业id。我已经尝试了很多链接。但我不知道如何获取失败或以前的作业 ID 并调用重新启动方法。请帮我。 (2认同)