lju*_*tin 4 parallel-processing spring spring-integration spring-batch
我是Spring批次的新手,无法弄清楚如何做到这一点..
基本上我有一个spring文件轮询器,每隔N分钟运行一次,在某个目录中查找带有某个名字的文件(例如:A.txt&B.txt).在任何时候,此目录中可能有最多2个文件(A和B).通过Spring Batch Job,这两个文件将被处理并持久保存到2个不同的DB表中.
这些文件有些类似,因此使用相同的处理器/写入器.
现在我设置的方式,每个轮询周期1文件被选中并且作业运行.
假设目录中有2个文件(A.txt和B.txt),有没有办法创建2个作业,以便两个作业可以并行运行?
Dan*_* C. 11
为了在Spring中以异步模式运行作业,有非常好的方法,这只是如何配置的问题JobLauncher.该JobLauncher有一个taskExecutor属性和异步执行可以根据分配给该属性的实现被激活.
您可以找到TaskExecutorSpring可以提供的所有类型,并根据您的需要选择最佳方法来完成批量异步作业. Spring中的任务执行器类型
例如SimpleAsyncTaskExecutor,一个任务执行器将Thread在任何调用上创建一个新的,并且如果执行以高频率运行,则可能产生性能问题.另一方面,还有一些TaskExecutors类型提供池化功能,以便重用资源并最大化系统效率.
以下是配置a的一个小例子ThreadPoolTaskExecutor:
A)配置ThreadPoolTaskExecutor Bean
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(15);
taskExecutor.setMaxPoolSize(20);
taskExecutor.setQueueCapacity(30);
return taskExecutor;
}
Run Code Online (Sandbox Code Playgroud)
B)配置JobLauncher Bean
@Bean
public JobLauncher jobLauncher(ThreadPoolTaskExecutor taskExecutor, JobRepository jobRepository){
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setTaskExecutor(taskExecutor);
jobLauncher.setJobRepository(jobRepository);
return jobLauncher;
}
Run Code Online (Sandbox Code Playgroud)
C)注入您JobLauncher的Jobs配置
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Qualifier("job1-file-A")
private Job job1;
@Autowired
@Qualifier("job2-file-B")
private Job job2;
Run Code Online (Sandbox Code Playgroud)
D)安排工作
@Scheduled(cron = "*/1 * * * * *")
public void run1(){
Map<String, JobParameter> confMap = new HashMap<>();
confMap.put("time", new JobParameter(System.currentTimeMillis()));
JobParameters jobParameters = new JobParameters(confMap);
try {
jobLauncher.run(job1, jobParameters);
}catch (Exception ex){
logger.error(ex.getMessage());
}
}
@Scheduled(cron = "*/1 * * * * *")
public void run2(){
Map<String, JobParameter> confMap = new HashMap<>();
confMap.put("time", new JobParameter(System.currentTimeMillis()));
JobParameters jobParameters = new JobParameters(confMap);
try {
jobLauncher.run(job2, jobParameters);
}catch (Exception ex){
logger.error(ex.getMessage());
}
}
Run Code Online (Sandbox Code Playgroud)
E)最后在你的SpringBoot类@EnableBatchProcessing和@EnableScheduling
@EnableBatchProcessing
@EnableScheduling
@SpringBootApplication
public class MyBatchApp {
Run Code Online (Sandbox Code Playgroud)
我相信你可以。由于您是春季批处理的新手(就像我一样),我建议您仔细阅读批处理的域语言(如果您还没有这样做的话)。
然后,您可以从配置自己的异步开始 JobLauncher。例如:
@Bean
public JobLauncher jobLauncher() throws Exception
{
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
Run Code Online (Sandbox Code Playgroud)
要特别注意SimpleAsyncTaskExecutor(作业仓库可以自动接线)。此配置将允许异步执行,如下图所示:
将其与同步执行流进行比较:
也许还可以帮助引用SimpleJobLauncherJava文档:
JobLauncher接口的简单实现。Spring Core TaskExecutor界面用于启动Job。这意味着执行程序集的类型非常重要。如果使用SyncTaskExecutor,则将在称为启动器的同一线程中处理作业。应该注意确保此类的所有用户都完全了解所使用的TaskExecutor的实现是否将同步或异步启动任务。默认设置使用同步任务执行器。
更多详细信息和配置选项- 在此处。
最后,只需使用不同的名称创建作业和/或使用不同的参数集启动它们。天真的例子是:
@Autowired
public JobBuilderFactory jobBuilderFactory;
public Job createJobA() {
return jobBuilderFactory.get("A.txt")
.incrementer(new RunIdIncrementer())
.flow(step1())
.next(step2())
.end()
.build();
}
public Job createJobB() {
return jobBuilderFactory.get("B.txt")
.incrementer(new RunIdIncrementer())
.flow(step1())
.next(step2())
.end()
.build();
}
Run Code Online (Sandbox Code Playgroud)
使用异步作业启动器执行这些作业将创建两个并行执行的作业实例。这只是一个选择,可能适合您的情况,也可能不适合您的情况。
| 归档时间: |
|
| 查看次数: |
16333 次 |
| 最近记录: |