我有一个Tasklet并想计算已处理的项目。然后,公共StepExecutionListener应该能够读取这些已处理的项目计数afterStep():
@Bean
public Step myStep() {
return stepBuilderFactory.get("Step2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
int items = dao.deleteItems(); //how to pass these items to a StepExecutionListener?
return RepeatStatus.FINISHED;
}
})
.build();
@Component
public class MyListener extends StepExecutionListenerSupport {
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
long items = stepExecution.getWriteCount();
return super.afterStep(stepExecution);
}
}
Run Code Online (Sandbox Code Playgroud)
如何将处理后的项目放入stepExecutiontasklet 中?
我有一个包含三个步骤的 Spring Batch 工作。第一步运行一次,第二步和第三步针对队列中的每个项目运行。该工作的代码:
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.listener(jobListener())
.flow(step1())
.next(step2())
.next(encrypt()).on("CONTINUE")
.to(step2()).on("COMPLETED")
.end().end().build();
Run Code Online (Sandbox Code Playgroud)
用于重复步骤的步骤执行监听器:
@AfterStep
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
if (queue.size() > 0) {
return new ExitStatus("CONTINUE");
} else {
return new ExitStatus("COMPLETED");
}
}
Run Code Online (Sandbox Code Playgroud)
队列是自动连接的。
我的问题是,即使步骤完成时没有失败的退出状态,作业仍然以失败的退出状态结束。
我已经对其进行了调试,以查看作业是否因步骤以“COMPLETED”以外的退出状态完成而失败,但事实并非如此。
有任何想法吗?我认为这可能与其中一个步骤没有特定的听众有关,但我不知道那可能是什么。如果需要的话我可以发布更多代码。感谢所有帮助。
日志跟踪:
2017-05-18 10:08:10.327 [INFO ] c.d.r.r.BatchApplication - Starting BatchApplication on <computer_name> with PID <pid>
2017-05-18 10:08:10.329 [DEBUG] c.d.r.r.BatchApplication - Running with Spring Boot v1.4.1.RELEASE, Spring v4.3.3.RELEASE
2017-05-18 10:08:10.330 [INFO ] c.d.r.r.BatchApplication - The following profiles are active: <profile> …Run Code Online (Sandbox Code Playgroud) 我的 Spring Boot 批处理应用程序有一个调度程序,它安排我在 FirstBatchConfiguration 类中编写的批处理作业每小时运行一次。
我有另一个批处理作业,我在同一个应用程序的 SecondBatchConfiguration 类中配置了该作业,我将安排该作业每周运行一次,但我无法弄清楚如何在同一个 JobScheduler 中安排它,以便两个作业都应该运行在他们自己安排的时间。
如果有人可以帮助我实现这一目标。
我当前的调度程序是:
@Component
public class JobScheduler {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private FirstBatchConfiguration firstBatchConfiguration;
@Scheduled(cron = "0 0 0/1 * * ?")
public void runJob() {
Map<String, JobParameter> confMap = new HashMap<>();
confMap.put("time", new JobParameter(System.currentTimeMillis()));
JobParameters jobParameters = new JobParameters(confMap);
final Logger logger = LoggerFactory.getLogger("applicationlogger");
try {
jobLauncher.run(firstBatchConfiguration.firstJob(), jobParameters);
} catch (JobExecutionAlreadyRunningException | JobInstanceAlreadyCompleteException
| JobParametersInvalidException | org.springframework.batch.core.repository.JobRestartException e) {
logger.error(e.getMessage());
}
}
}
Run Code Online (Sandbox Code Playgroud) 所以我觉得我已经正确定义了我的作业流程,但是当我运行它时,我收到一个 FlowExecutionException ,说我的流程“d”的“在流程中找不到下一个状态”。
我正在使用一个实现 JobExecutionListener 的 tasklet,如下所示,它似乎按预期工作:
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
if(a && b) {
return new ExitStatus("a");
} else if(c) {
return new ExitStatus("b");
} else if(d) {
return new ExitStatus("c");
} else {
return new ExitStatus("d");
}
}
Run Code Online (Sandbox Code Playgroud)
这是我的工作配置。我还想简化它(所有步骤都将进行“b”,所以我希望这是“之后”的事情),但只是希望它首先工作。
@Bean
public Job job() {
LOGGER.info("STARTING BATCH JOB");
return jobs.get("job")
.incrementer(new RunIdIncrementer())
.start(taskletStep())
.on("a")
.to(step1a)
.next(step2)
.on("b")
.to(step2)
.on("c")
.to(step2)
.on("d")
.to(step1b)
.next(step2)
.on("*").end()
.end()
.build();
}
Run Code Online (Sandbox Code Playgroud)
我知道我一定错过了流程的某些部分,但我无法确定它。
我了解 JobRepository 用于作业状态的 CRUD 操作。我正在使用持久性数据库,JobRepository 会在数据库中保留历史元数据还是只存储当前正在运行的进程?
另外,如果我有一系列由作业调度程序执行的作业,并且每个作业都有自己的 JobRepository 数据库,它们会共享相同的持久表还是我必须为每个 JobRepository 创建不同的数据库?
我正在制作一个 Spring Batch 应用程序,其中有 tasklet 和处理器。在处理器中,我抛出异常,我的需要是根据我抛出的异常来 System.exit() 应用程序。
但我不知道该怎么做。
感谢您的回复
我是 Spring 批处理的新手。我创建了一个决策程序,它将 FlowExecutionStatus 返回为“YES”/“NO”。基于FlowExecutionStatus,我需要调用step2()或step3()。
在我下面的代码中,step2()在决策程序之前被调用。如何修改代码以便调用决策程序,并根据FlowExecutionStatus返回的 bu调用step2()或step3()应该调用决策程序。请帮忙。
@Autowired
private NumberDecider decider;
@Bean
public Job NumberLoaderJob() throws NumberFormatException, IOException {
return jobBuilderFactory.get("numberLoaderJob").start(step1()).listener(new MyNumberJobListener())
.next(decider).on("YES").to(step2())
.from(decider).on("NO").to(step3()).end().build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("Exiting step1() execute()");
return RepeatStatus.FINISHED;
}
}).build();
}
/**
* Step 2
*
* @return
* @throws NumberFormatException
* @throws IOException
*/ …Run Code Online (Sandbox Code Playgroud) 我正在使用带有 OpenCSV 的 Spring Batch Tasklet 来读取我的 CSV 文件。在问这个问题之前,我知道块,但是在后面的步骤中文件之间存在交叉验证,所以我必须继续使用Tasklet。
我要做的是向我的报告步骤报告丢失的文件或解析错误。我不确定向下一步报告失败的正确方法应该是什么。我有以下代码。
读取文件的初始步骤。
public class CsvBatchReader<T> implements Tasklet, StepExecutionListener {
private final Logger logger = LoggerFactory.getLogger(CsvBatchReader.class);
private List batch;
private final Class<T> clazz;
private Path path;
public CsvBatchReader(Class<T> clazz, Path path) {
this.clazz = clazz;
this.path = path;
}
@Override
public void beforeStep(StepExecution stepExecution) {
logger.info("Reader initialized - " + clazz.getSimpleName());
batch = new ArrayList();
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.info("Reader ended - " + clazz.getSimpleName());
return ExitStatus.COMPLETED;
}
@Override …Run Code Online (Sandbox Code Playgroud) 给定一个为其配置了实例列表的Spring Batch作业,JobExecutionListener每个侦听器的执行顺序是什么。例子 :
<job id="myJob" xmlns="http://www.springframework.org/schema/batch">
<batch:listeners>
<batch:listener ref="myJobExecutionListener1" />
<batch:listener ref="myJobExecutionListener2" />
<batch:listener ref="myJobExecutionListener3" />
<batch:listener ref="myJobExecutionListener4" />
</batch:listeners>
<!-- job config continues -->
</job>
Run Code Online (Sandbox Code Playgroud)
在上面的例子中,有没有保证监听器会按照配置的顺序执行,还是会按照随机的顺序执行。我尝试查看Spring Batch参考文档,但就我的研究而言,我找不到这个文档。
我正在尝试配置 Spring Batch 以使用 PostGres DB。我在我的build.gradle.kts文件中包含了以下依赖项:
implementation("org.springframework.boot:spring-boot-starter-data-jpa")
implementation("org.postgresql:postgresql")
Run Code Online (Sandbox Code Playgroud)
我application.yml的 SpringBatch 模块包含以下内容:
spring:
datasource:
url: jdbc:postgresql://postgres:5432/springbatchdb
username: postgres
password: root
driverClassName: org.postgresql.Driver
Run Code Online (Sandbox Code Playgroud)
docker-compose.yml
postgres:
restart: always
image: postgres:12-alpine
container_name: postgres
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=root
- POSTGRES_DB=springbatchdb
ports:
- "5432:5432"
volumes:
- postgresql:/var/lib/postgresql
- postgresql_data:/var/lib/postgresql/data
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试添加数据文件时,我在 SpringBatch Docker 容器和 PostGres 容器的日志中看到以下错误:
春季批次:
<<< Exception in method: org.meanwhileinhell.spring.batch.server.SpringBatchController.handle Error Message: PreparedStatementCallback; bad SQL grammar [SELECT JOB_INSTANCE_ID, JOB_NAME from BATCH_JOB_INSTANCE where JOB_NAME = ? and JOB_KEY = ?]; nested …Run Code Online (Sandbox Code Playgroud)