Spring Batch如何在将重复项发送到ItemWriter之前对其进行过滤

Aur*_*e77 5 spring batch-processing spring-batch

我读了一个平面文件(例如.csv文件,每个用户1行,Ex:UserId; Data1; Date2).

但是如何在阅读器中处理重复的用户项目(哪里没有previus readed用户列表......)

stepBuilderFactory.get("createUserStep1")
.<User, User>chunk(1000)
.reader(flatFileItemReader) // FlatFileItemReader
.writer(itemWriter) // For example JDBC Writer
.build();
Run Code Online (Sandbox Code Playgroud)

Mic*_*lla 17

过滤通常使用ItemProcessor.如果ItemProcessor返回null,则过滤该项并不传递给ItemWriter.否则就是.在您的情况下,您可以保留以前看到的用户的列表ItemProcessor.如果之前没有看到用户,请将其传递给它.如果之前已经看过,则返回null.您可以ItemProcessor在此处的文档中阅读有关过滤的更多信息:http://docs.spring.io/spring-batch/trunk/reference/html/readersAndWriters.html#filiteringRecords

/**
* This implementation assumes that there is enough room in memory to store the duplicate
* Users.  Otherwise, you'd want to store them somewhere you can do a look-up on.
*/
public class UserFilterItemProcessor implements ItemProcessor<User, User> {

    // This assumes that User.equals() identifies the duplicates
    private Set<User> seenUsers = new HashSet<User>();

    public User process(User user) {
        if(seenUsers.contains(user)) {
            return null;
        }
        seenUsers.add(user);
        return user;

    }
}
Run Code Online (Sandbox Code Playgroud)


小智 6

你可以在这里看到http://docs.spring.io/spring-batch/trunk/reference/html/readersAndWriters.html#faultTolerant

回滚块时,可以重新处理在读取期间缓存的项目.如果某个步骤配置为容错(通常使用跳过或重试处理),则所使用的任何ItemProcessor都应以幂等的方式实现

这意味着在Michael的示例中,用户第一次处理时,用户被缓存在Set中,如果写入项目失败,如果步骤是容错,则将再次为同一用户和此过滤器执行处理器将过滤掉用户.

改进代码:

/**
 * This implementation assumes that there is enough room in memory to store the duplicate
 * Users.  Otherwise, you'd want to store them somewhere you can do a look-up on.
 */
public class UserFilterItemProcessor implements ItemProcessor<User, User> {

    // This assumes that User.equals() identifies the duplicates
    private Set<User> seenUsers = new HashSet<User>();

    public User process(User user) {
        if(seenUsers.contains(user) && !user.hasBeenProcessed()) {
            return null;
        } else {
            seenUsers.add(user);
            user.setProcessed(true);
            return user;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)