Spring批处理 - 并行运行多个作业

lju*_*tin 4 parallel-processing spring spring-integration spring-batch

我是Spring批次的新手,无法弄清楚如何做到这一点..

基本上我有一个spring文件轮询器,每隔N分钟运行一次,在某个目录中查找带有某个名字的文件(例如:A.txt&B.txt).在任何时候,此目录中可能有最多2个文件(A和B).通过Spring Batch Job,这两个文件将被处理并持久保存到2个不同的DB表中.

这些文件有些类似,因此使用相同的处理器/写入器.

现在我设置的方式,每个轮询周期1文件被选中并且作业运行.

假设目录中有2个文件(A.txt和B.txt),有没有办法创建2个作业,以便两个作业可以并行运行?

Dan*_* C. 11

为了在Spring中以异步模式运行作业,有非常好的方法,这只是如何配置的问题JobLauncher.该JobLauncher有一个taskExecutor属性和异步执行可以根据分配给该属性的实现被激活.

您可以找到TaskExecutorSpring可以提供的所有类型,并根据您的需要选择最佳方法来完成批量异步作业. Spring中的任务执行器类型

例如SimpleAsyncTaskExecutor,一个任务执行器将Thread在任何调用上创建一个新的,并且如果执行以高频率运行,则可能产生性能问题.另一方面,还有一些TaskExecutors类型提供池化功能,以便重用资源并最大化系统效率.

以下是配置a的一个小例子ThreadPoolTaskExecutor:

A)配置ThreadPoolTask​​Executor Bean

@Bean
    public ThreadPoolTaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(15);
    taskExecutor.setMaxPoolSize(20);
    taskExecutor.setQueueCapacity(30);
    return taskExecutor;
}
Run Code Online (Sandbox Code Playgroud)

B)配置JobLauncher Bean

   @Bean
    public JobLauncher jobLauncher(ThreadPoolTaskExecutor taskExecutor, JobRepository jobRepository){
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setTaskExecutor(taskExecutor);
        jobLauncher.setJobRepository(jobRepository);
        return jobLauncher;
    }
Run Code Online (Sandbox Code Playgroud)

C)注入您JobLauncherJobs配置

@Autowired
private JobLauncher jobLauncher;

@Autowired
@Qualifier("job1-file-A")
private Job job1;

@Autowired
@Qualifier("job2-file-B")
private Job job2;
Run Code Online (Sandbox Code Playgroud)

D)安排工作

@Scheduled(cron = "*/1 * * * * *")
public void run1(){
    Map<String, JobParameter> confMap = new HashMap<>();
    confMap.put("time", new JobParameter(System.currentTimeMillis()));
    JobParameters jobParameters = new JobParameters(confMap);
    try {
        jobLauncher.run(job1, jobParameters);
    }catch (Exception ex){
        logger.error(ex.getMessage());
    }

}

@Scheduled(cron = "*/1 * * * * *")
public void run2(){
    Map<String, JobParameter> confMap = new HashMap<>();
    confMap.put("time", new JobParameter(System.currentTimeMillis()));
    JobParameters jobParameters = new JobParameters(confMap);
    try {
        jobLauncher.run(job2, jobParameters);
    }catch (Exception ex){
        logger.error(ex.getMessage());
    }

}
Run Code Online (Sandbox Code Playgroud)

E)最后在你的SpringBoot类@EnableBatchProcessing@EnableScheduling

@EnableBatchProcessing
@EnableScheduling
@SpringBootApplication
public class MyBatchApp {
Run Code Online (Sandbox Code Playgroud)


Lac*_*lev 7

我相信你可以。由于您是春季批处理的新手(就像我一样),我建议您仔细阅读批处理的域语言(如果您还没有这样做的话)。

然后,您可以从配置自己的异步开始 JobLauncher。例如:

  @Bean
  public JobLauncher jobLauncher() throws Exception
  {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();

    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
    jobLauncher.afterPropertiesSet();

    return jobLauncher;
  }
Run Code Online (Sandbox Code Playgroud)

要特别注意SimpleAsyncTaskExecutor(作业仓库可以自动接线)。此配置将允许异步执行,如下图所示:

异步

将其与同步执行流进行比较:

在此处输入图片说明

也许还可以帮助引用SimpleJobLauncherJava文档:

JobLauncher接口的简单实现。Spring Core TaskExecutor界面用于启动Job。这意味着执行程序集的类型非常重要。如果使用SyncTaskExecutor,则将在称为启动器的同一线程中处理作业。应该注意确保此类的所有用户都完全了解所使用的TaskExecutor的实现是否将同步或异步启动任务。默认设置使用同步任务执行器。

更多详细信息和配置选项- 在此处

最后,只需使用不同的名称创建作业和/或使用不同的参数集启动它们。天真的例子是:

  @Autowired
  public JobBuilderFactory jobBuilderFactory;

  public Job createJobA() {
    return jobBuilderFactory.get("A.txt")
                            .incrementer(new RunIdIncrementer())
                            .flow(step1())
                            .next(step2())
                            .end()
                            .build();
  }

  public Job createJobB() {
    return jobBuilderFactory.get("B.txt")
                            .incrementer(new RunIdIncrementer())
                            .flow(step1())
                            .next(step2())
                            .end()
                            .build();
  }
Run Code Online (Sandbox Code Playgroud)

使用异步作业启动器执行这些作业将创建两个并行执行的作业实例。这只是一个选择,可能适合您的情况,也可能不适合您的情况。

  • 很棒的例子。如果您可以列出如何在主线程中捕获作业完成情况的方法,这也会非常有益。只是为了完成伟大的答案!:) (2认同)