rra*_*ram 3 java spring spring-batch spring-boot spring-batch-job-monitoring
我遵循了spring 批处理文档,但无法异步运行我的工作。
所以我从 web 容器运行作业,作业将通过 REST 端点触发。
我想在完成整个工作之前获得 JobInstance ID以作为响应传递它。因此,他们可以稍后使用 JobInstance ID 检查作业的状态,而无需等待。但我无法让它工作。下面是我试过的示例代码。请让我知道我错过了什么或错了什么。
BatchConfig 使 Async JobLauncher
@Configuration
public class BatchConfig {
@Autowired
JobRepository jobRepository;
@Bean
public JobLauncher simpleJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
}
Run Code Online (Sandbox Code Playgroud)
控制器
@Autowired
JobLauncher jobLauncher;
@RequestMapping(value="/trigger-job", method = RequestMethod.GET)
public Long workHard() throws Exception {
JobParameters jobParameters = new JobParametersBuilder().
addLong("time", System.currentTimeMillis())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);
System.out.println(jobExecution.getJobInstance().getInstanceId());
System.out.println("OK RESPONSE");
return jobExecution.getJobInstance().getInstanceId();
}
Run Code Online (Sandbox Code Playgroud)
和 JobBuilder 作为组件
@Component
public class BatchComponent {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
public Job customJob(String someParam) throws Exception {
return jobBuilderFactory.get("personProcessor")
.incrementer(new RunIdIncrementer()).listener(listener())
.flow(personPorcessStep(someParam)).end().build();
}
private Step personPorcessStep(String someParam) throws Exception {
return stepBuilderFactory.get("personProcessStep").<PersonInput, PersonOutput>chunk(1)
.reader(new PersonReader(someParam)).faultTolerant().
skipPolicy(new DataDuplicateSkipper()).processor(new PersonProcessor())
.writer(new PersonWriter()).build();
}
private JobExecutionListener listener() {
return new PersonJobCompletionListener();
}
private class PersonInput {
String firstName;
public PersonInput(String firstName) {
this.firstName = firstName;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
}
private class PersonOutput {
String firstName;
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
}
public class PersonReader implements ItemReader<PersonInput> {
private List<PersonInput> items;
private int count = 0;
public PersonReader(String someParam) throws InterruptedException {
Thread.sleep(10000L); //to simulate processing
//manipulate and provide data in the read method
//just for testing i have given some dummy example
items = new ArrayList<PersonInput>();
PersonInput pi = new PersonInput("john");
items.add(pi);
}
@Override
public PersonInput read() {
if (count < items.size()) {
return items.get(count++);
}
return null;
}
}
public class DataDuplicateSkipper implements SkipPolicy {
@Override
public boolean shouldSkip(Throwable exception, int skipCount) throws SkipLimitExceededException {
if (exception instanceof DataIntegrityViolationException) {
return true;
}
return true;
}
}
private class PersonProcessor implements ItemProcessor<PersonInput, PersonOutput> {
@Override
public PersonOutput process(PersonInput item) throws Exception {
return null;
}
}
private class PersonWriter implements org.springframework.batch.item.ItemWriter<PersonOutput> {
@Override
public void write(List<? extends PersonOutput> results) throws Exception {
return;
}
}
private class PersonJobCompletionListener implements JobExecutionListener {
public PersonJobCompletionListener() {
}
@Override
public void beforeJob(JobExecution jobExecution) {
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println("JOB COMPLETED");
}
}
}
Run Code Online (Sandbox Code Playgroud)
主功能
@SpringBootApplication
@EnableBatchProcessing
@EnableScheduling
@EnableAsync
public class SpringBatchTestApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchTestApplication.class, args);
}
}
Run Code Online (Sandbox Code Playgroud)
我正在使用基于注释的配置,并将 gradle 与以下批处理包一起使用。
compile('org.springframework.boot:spring-boot-starter-batch')
Run Code Online (Sandbox Code Playgroud)
如果需要更多信息,请告诉我。我找不到任何示例来运行这个常见用例。
谢谢你的时间。
试试这个,在您的配置中,您需要使用@Bean(name = "myJobLauncher")创建带有SimpleAsyncTaskExecutor的customJobLauncher,并且将在您的控制器中使用@Qualifier。
@Bean(name = "myJobLauncher")
public JobLauncher simpleJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
Run Code Online (Sandbox Code Playgroud)
在您的控制器中
@Autowired
@Qualifier("myJobLauncher")
private JobLauncher jobLauncher;
Run Code Online (Sandbox Code Playgroud)
如果我查看你的代码,我会发现一些错误。首先,您的自定义配置未加载,因为如果加载,则同一接口的重复 bean 实例的注入将会失败。
Spring Boot 有很多魔力,但是如果你不告诉他进行一些组件扫描,则不会按预期加载任何内容。
我看到的第二个问题是您的 BatchConfig 类:它不会扩展 DefaultBatchConfigure,也不会覆盖 getJobLauncher(),因此即使启动魔法会加载所有内容,您也会获得默认的。这是一个可以使用的配置,并且它符合文档@EnableBatchProcessing API
批量配置
@Configuration
@EnableBatchProcessing(modular = true)
@Slf4j
public class BatchConfig extends DefaultBatchConfigurer {
@Override
@Bean
public JobLauncher getJobLauncher() {
try {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
} catch (Exception e) {
log.error("Can't load SimpleJobLauncher with SimpleAsyncTaskExecutor: {} fallback on default", e);
return super.getJobLauncher();
}
}
}
Run Code Online (Sandbox Code Playgroud)
主功能
@SpringBootApplication
@EnableScheduling
@EnableAsync
@ComponentScan(basePackageClasses = {BatchConfig.class})
public class SpringBatchTestApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchTestApplication.class, args);
}
}
Run Code Online (Sandbox Code Playgroud)
小智 1
JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);。Joblauncher 将在作业完成后等待,然后再返回任何内容,这就是为什么您的服务可能需要很长时间才能响应(如果这是您的问题)。如果您想要异步功能,您可能需要看看 Spring 的@EnableAsync& @Async。
| 归档时间: |
|
| 查看次数: |
13222 次 |
| 最近记录: |