wok*_*kmi 4 java spring spring-batch partition
我写了两个步骤的作业,其中两个步骤之一是分区步骤.分区步骤使用TaskExecutorPartitionHandler并在线程中运行5个从属步骤.该作业在main()方法中启动.但是在每个从属ItemReader返回null(结束符号)之后它不会停止.即使程序运行了main()方法中的最后一行代码(即System.out.println("Finished")),程序进程也不会停止,挂在内存中,什么也不做.我必须按下Eclipse面板上的停止按钮才能停止程序.
以下是JobLauncher.run()返回的JobExecution的内容,表示作业运行的成功状态.
JobExecution:id = 0,version = 2,startTime = Fri Nov 27 06:05:23 CST 2015,endTime = Fri Nov 27 06:05:39 CST 2015,lastUpdated = Fri Nov 27 06:05:39 CST 2015,status = COMPLETED,exitStatus = exitCode = COMPLETED; exitDescription =,job = [JobInstance:id = 0,version = 0,Job = [jobCensoredPages]],jobParameters = [{}]
7217
完了
为什么运行成功的Spring Batch程序仍然挂起?请指出我在哪里工作.我怀疑Spring Batch管理的多线程部分没有停止..
简单的工作运行代码
Job job = (Job) context.getBean("jobPages");
try {
JobParameters p=new JobParametersBuilder()
.toJobParameters();
JobExecution result = launcher.run(job, new JobParameters());
System.out.println(result.toString());
} catch (Exception e) {
e.printStackTrace();
}
context.getBean("idSet");
AtomicInteger n=(AtomicInteger) context.getBean("pageCount");
System.out.println(n.get());
System.out.println("Finished");
Run Code Online (Sandbox Code Playgroud)
对Patitioner和PatitionHandler的配置
@Bean @Autowired
public PartitionHandler beanPartitionHandler(
TaskExecutor beanTaskExecutor,
@Qualifier("beanStepSlave") Step beanStepSlave
) throws Exception
{
TaskExecutorPartitionHandler h=new TaskExecutorPartitionHandler();
h.setGridSize(5);
h.setTaskExecutor(beanTaskExecutor);
h.setStep(beanStepSlave);
h.afterPropertiesSet();
return h;
}
@Bean public TaskExecutor beanTaskExecutor() {
ThreadPoolTaskExecutor e = new ThreadPoolTaskExecutor();
e.setMaxPoolSize(5);
e.setCorePoolSize(5);
e.afterPropertiesSet();
return e;
}
Run Code Online (Sandbox Code Playgroud)
唯一的一步,它是奴隶的一步
@Bean public Step beanStepMaster(
Step beanStepSlave,
Partitioner beanPartitioner,
PartitionHandler beanPartitionHandler
) throws Exception
{
return stepBuilderFactory().get("stepMaster")
.partitioner(beanStepSlave)
.partitioner("stepSlave", beanPartitioner)
.partitionHandler(partitionHandler)
.build();
}
@Bean @Autowired
public Step beanStepSlave(
ItemReader<String> beanReaderTest,
ItemProcessor<String, String> beanProcessorTest,
ItemWriter<String> beanWriterTest) throws Exception{
return stepBuilderFactory().get("stepSlave")
.<String, String>chunk(1)
.reader(beanReaderTest)
.processor(beanProcessorTest)
.writer(beanWriterTest)
.build();
}
Run Code Online (Sandbox Code Playgroud)
我的pom.xml文件
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>4.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>4.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>1.1.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>RELEASE</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
小智 7
当我使用ThreadPoolTaskExecutor时,我在分区的Spring批处理应用程序挂起时也遇到了困难.另外,我看到执行程序不允许完成所有分区的工作.
我找到了解决这些问题的两种方法.
第一种解决方案是使用SimpleAsyncTaskExecutor而不是ThreadPoolTaskExecutor.如果您不介意重新创建线程的额外开销,这是一个简单的修复.
第二个解决方案是创建一个JobExecutionListener,它在ThreadPoolTaskExecutor上调用shutdown.
我创建了一个JobExecutionListener,如下所示:
@Bean
public JobExecutionListener jobExecutionListener(ThreadPoolTaskExecutor executor) {
return new JobExecutionListener() {
private ThreadPoolTaskExecutor taskExecutor = executor;
@Override
public void beforeJob(JobExecution jobExecution) {
}
@Override
public void afterJob(JobExecution jobExecution) {
taskExecutor.shutdown();
}
};
}
Run Code Online (Sandbox Code Playgroud)
并将其添加到我的Job定义中,如下所示:
@Bean
public Job partitionedJob(){
return jobBuilders.get("partitionedJob")
.listener(jobExecutionListener(taskExecutor()))
.start(partitionedStep())
.build();
}
Run Code Online (Sandbox Code Playgroud)
以上所有答案都是黑客/解决方法。问题中发布的问题的根本原因是 threadPoolTaskExecutor 不共享步骤的生命周期。因此,在销毁 step/job context 时,线程池不会自动销毁并且永远运行。将 threadPoolExecutor 放在 stepContext "@StepScope" 中应该可以解决问题。Spring 负责销毁它。
@Bean
@StepScope
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
尽管我不知道原因,但您的问题有两种解决方案。
首先,您可以使用 aCommandLineJobRunner来启动Job. 请参阅此处的文档。此类在作业结束时自动退出程序,并将 ExitStatus 转换为返回代码 ( COMPLETED= 0, FAILED= 1...)。默认返回码由SimpleJvmExitCodeMapper.
第二种解决方案是System.exit()在JobLauncher.run(). 您还可以手动转换 的ExitStatus并Job在手动退出中使用它:
// Create Job
JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
Job job = (Job) context.getBean(jobName);
// Create return codes mapper
SimpleJvmExitCodeMapper mapper = new SimpleJvmExitCodeMapper();
// Start Job
JobExecution execution = jobLauncher.run(job, new JobParameters());
// Close context
context.close();
// Map codes and exit
String status = execution.getExitStatus().getExitCode();
Integer returnCode = mapper.intValue(status);
System.exit(returnCode);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4155 次 |
| 最近记录: |