完成后,分区的工作不能自行停止?春批

wok*_*kmi 4 java spring spring-batch partition

我写了两个步骤的作业,其中两个步骤之一是分区步骤.分区步骤使用TaskExecutorPartitionHandler并在线程中运行5个从属步骤.该作业在main()方法中启动.但是在每个从属ItemReader返回null(结束符号)之后它不会停止.即使程序运行了main()方法中的最后一行代码(即System.out.println("Finished")),程序进程也不会停止,挂在内存中,什么也不做.我必须按下Eclipse面板上的停止按钮才能停止程序.

以下是JobLauncher.run()返回的JobExecution的内容,表示作业运行的成功状态.

JobExecution:id = 0,version = 2,startTime = Fri Nov 27 06:05:23 CST 2015,endTime = Fri Nov 27 06:05:39 CST 2015,lastUpdated = Fri Nov 27 06:05:39 CST 2015,status = COMPLETED,exitStatus = exitCode = COMPLETED; exitDescription =,job = [JobInstance:id = 0,version = 0,Job = [jobCensoredPages]],jobParameters = [{}]

7217
完了

为什么运行成功的Spring Batch程序仍然挂起?请指出我在哪里工作.我怀疑Spring Batch管理的多线程部分没有停止..

简单的工作运行代码

        Job job = (Job) context.getBean("jobPages");
        try {
            JobParameters p=new JobParametersBuilder()
                .toJobParameters();

            JobExecution result = launcher.run(job, new JobParameters());

            System.out.println(result.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
        context.getBean("idSet");
        AtomicInteger n=(AtomicInteger) context.getBean("pageCount");
        System.out.println(n.get());        
        System.out.println("Finished");
Run Code Online (Sandbox Code Playgroud)

对Patitioner和PatitionHandler的配置

    @Bean @Autowired 
    public PartitionHandler beanPartitionHandler(
        TaskExecutor beanTaskExecutor, 
        @Qualifier("beanStepSlave") Step beanStepSlave
        ) throws Exception
    {
        TaskExecutorPartitionHandler h=new TaskExecutorPartitionHandler();
        h.setGridSize(5);
        h.setTaskExecutor(beanTaskExecutor);
        h.setStep(beanStepSlave);   
        h.afterPropertiesSet(); 
        return h;
    }
    @Bean public TaskExecutor beanTaskExecutor() {
        ThreadPoolTaskExecutor e = new ThreadPoolTaskExecutor();
        e.setMaxPoolSize(5);
        e.setCorePoolSize(5);
        e.afterPropertiesSet();
        return e;
    }
Run Code Online (Sandbox Code Playgroud)

唯一的一步,它是奴隶的一步

@Bean public Step beanStepMaster(
        Step beanStepSlave,
        Partitioner beanPartitioner,
        PartitionHandler beanPartitionHandler
        )   throws Exception 
    {
        return stepBuilderFactory().get("stepMaster")
        .partitioner(beanStepSlave)
        .partitioner("stepSlave", beanPartitioner)
        .partitionHandler(partitionHandler)
        .build();
    }
    @Bean @Autowired 
    public Step beanStepSlave(
        ItemReader<String> beanReaderTest,
        ItemProcessor<String, String> beanProcessorTest,
        ItemWriter<String> beanWriterTest) throws Exception{
        return stepBuilderFactory().get("stepSlave")
            .<String, String>chunk(1)
            .reader(beanReaderTest)
            .processor(beanProcessorTest)
            .writer(beanWriterTest)
            .build();
    }
Run Code Online (Sandbox Code Playgroud)

我的pom.xml文件

<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>RELEASE</version>
      <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>4.2.3.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>4.2.3.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-tx</artifactId>
        <version>4.2.3.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-core</artifactId>
        <version>RELEASE</version>      
    </dependency>
    <dependency>
        <groupId>org.springframework.retry</groupId>
        <artifactId>spring-retry</artifactId>
        <version>1.1.2.RELEASE</version>
    </dependency>

<dependency>    
    <groupId>org.springframework</groupId>
    <artifactId>spring-beans</artifactId>
    <version>RELEASE</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

小智 7

当我使用ThreadPoolTask​​Executor时,我在分区的Spring批处理应用程序挂起时也遇到了困难.另外,我看到执行程序不允许完成所有分区的工作.

我找到了解决这些问题的两种方法.

第一种解决方案是使用SimpleAsyncTaskExecutor而不是ThreadPoolTask​​Executor.如果您不介意重新创建线程的额外开销,这是一个简单的修复.

第二个解决方案是创建一个JobExecutionListener,它在ThreadPoolTask​​Executor上调用shutdown.

我创建了一个JobExecutionListener,如下所示:

@Bean
public JobExecutionListener jobExecutionListener(ThreadPoolTaskExecutor executor) {
    return new JobExecutionListener() {
        private ThreadPoolTaskExecutor taskExecutor = executor;
        @Override
        public void beforeJob(JobExecution jobExecution) {

        }

        @Override
        public void afterJob(JobExecution jobExecution) {
            taskExecutor.shutdown();
        }
    };
}
Run Code Online (Sandbox Code Playgroud)

并将其添加到我的Job定义中,如下所示:

@Bean
public Job partitionedJob(){
    return jobBuilders.get("partitionedJob")
            .listener(jobExecutionListener(taskExecutor()))
            .start(partitionedStep())
            .build();
}
Run Code Online (Sandbox Code Playgroud)


nis*_*run 6

以上所有答案都是黑客/解决方法。问题中发布的问题的根本原因是 threadPoolTask​​Executor 不共享步骤的生命周期。因此,在销毁 step/job context 时,线程池不会自动销毁并且永远运行。将 threadPoolExecutor 放在 stepContext "@StepScope" 中应该可以解决问题。Spring 负责销毁它。

@Bean @StepScope public ThreadPoolTaskExecutor threadPoolTaskExecutor() {


Thr*_*rax 2

尽管我不知道原因,但您的问题有两种解决方案。

首先,您可以使用 aCommandLineJobRunner来启动Job. 请参阅此处的文档。此类在作业结束时自动退出程序,并将 ExitStatus 转换为返回代码 ( COMPLETED= 0, FAILED= 1...)。默认返回码由SimpleJvmExitCodeMapper.

第二种解决方案是System.exit()JobLauncher.run(). 您还可以手动转换 的ExitStatusJob在手动退出中使用它:

// Create Job
JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
Job job = (Job) context.getBean(jobName);

// Create return codes mapper
SimpleJvmExitCodeMapper mapper = new SimpleJvmExitCodeMapper();

// Start Job
JobExecution execution = jobLauncher.run(job, new JobParameters());

// Close context
context.close();

// Map codes and exit
String status = execution.getExitStatus().getExitCode();
Integer returnCode = mapper.intValue(status);
System.exit(returnCode);
Run Code Online (Sandbox Code Playgroud)