shi*_*kar 2 mongodb spring-batch
@Bean
public Job orderJob() throws Exception {
return jobBuilderFactory.get("orderJob").incrementer(new RunIdIncrementer()).listener(listener())
.flow(orderStep()).end().build();
}
@Bean
public Step orderStep() throws Exception {
return stepBuilderFactory.get("orderStep").<OrderCollection, Order>chunk(1000)
.reader(orderReader()).processor(orderProcessor()).writer(orderWriter())
.allowStartIfComplete(true).build();
}
@Bean
@StepScope
public MongoItemReader<OrderCollection> orderReader() throws Exception {
MongoItemReader<OrderCollection> reader = new MongoItemReader<>();
reader.setTemplate(mongoTemplate);
reader.setCollection("order");
Map<String, Sort.Direction> sort = new HashMap<>();
sort.put("_id", Sort.Direction.ASC);
reader.setSort(sort);
reader.setTargetType(OrderCollection.class);
reader.setQuery("{$or: [ {flag:false}, {flag:null} ]}");
return reader;
}
@Bean
@StepScope
public OrderProcessor orderProcessor() {
return new OrderProcessor();
}
@Bean
@StepScope
public ItemWriter<Order> orderWriter() {
return new OrderWriter();
}
Run Code Online (Sandbox Code Playgroud)
Order Collection 中有 5686 条记录,对于所有记录,如果为 false 则标记为标志。但读取器在第一次运行时仅读取和处理 3000 条记录。第二次运行 1686 条记录,第三次运行 1000 条记录。没有错误仅供参考
小智 6
我猜您可能正在更新您从中读取的集合,并且您还更新了查询正在使用的字段。如果是这样,那么我最近遇到了同样的问题。
MongoItemReader 是一个分页阅读器。因此,每次写入者更新这些记录时,读取者的池较小,但页面仍在增加。
因此,假设我们有 20 个项目并一次读取 5 个项目:
1) 阅读 1-5 项,共 20 项。
2) 更新项目 1-5,现在总共有 15 个可能的项目
3) 阅读 15 项中的 6-10 项。
4) 更新了 6-10 项,现在总共有 10 个可能的项。
5) 阅读 10 个可能的项目中的第 11-15 项
6) Read 返回 null,因为该页面没有返回任何内容。
所以现在你只处理了一半。
我按照下面的教程创建了一个 MongoDbCursorItemReader,它为我解决了这个问题:https ://blog.zenika.com/2012/05/23/spring-batch-and-mongodb-cursor-based-item-reader/
| 归档时间: |
|
| 查看次数: |
1642 次 |
| 最近记录: |