Spring Batch:一个阅读器,多个处理器和编写器

dan*_*emi 19 spring spring-batch itemwriter itemprocessor

在Spring批处理中,我需要将ItemReader读取的项目传递给两个不同的处理器和编写器.我想要实现的是......

                        +---> ItemProcessor#1 ---> ItemWriter#1
                        |
ItemReader ---> item ---+
                        |
                        +---> ItemProcessor#2 ---> ItemWriter#2

这是必要的,因为通过ItemWriter#1写入项目应在一个完全不同的方式来处理与通过ItemWriter#2写入的那些.此外,ItemReader从数据库中读取项目,并且它执行的查询计算成本很高,因此应该丢弃执行两次相同的查询.

有关如何实现此类设置的任何提示?或者,至少,逻辑上等同的设置?

Luc*_*cci 11

如果您的项目应由处理器#1和处理器#2处理,则此解决方案有效

您必须使用此签名创建处理器#0:

class Processor0<Item, CompositeResultBean>
Run Code Online (Sandbox Code Playgroud)

CompositeResultBeanbean定义为哪里

class CompositeResultBean {
  Processor1ResultBean result1;
  Processor2ResultBean result2;
}
Run Code Online (Sandbox Code Playgroud)

在您的Processor#0中,只需将工作委托给处理器#1和#2并将结果输入 CompositeResultBean

CompositeResultBean Processor0.process(Item item) {
  final CompositeResultBean r = new CompositeResultBean();
  r.setResult1(processor1.process(item));
  r.setResult2(processor2.process(item));
  return r;
}
Run Code Online (Sandbox Code Playgroud)

你自己的作家是作家的CompositeItemWriter代表CompositeResultBean.result1CompositeResultBean.result2(看看PropertyExtractingDelegatingItemWriter,也许可以帮忙)


Jua*_*o G 6

我遵循了 Luca 的建议,将其PropertyExtractingDelegatingItemWriter用作作家,并且我能够在一个步骤中处理两个不同的实体。

首先我所做的是定义一个 DTO 来存储来自处理器的两个实体/结果

public class DatabaseEntry {
    private AccessLogEntry accessLogEntry;
    private BlockedIp blockedIp;

    public AccessLogEntry getAccessLogEntry() {
        return accessLogEntry;
    }

    public void setAccessLogEntry(AccessLogEntry accessLogEntry) {
        this.accessLogEntry = accessLogEntry;
    }

    public BlockedIp getBlockedIp() {
        return blockedIp;
    }

    public void setBlockedIp(BlockedIp blockedIp) {
        this.blockedIp = blockedIp;
    }
}
Run Code Online (Sandbox Code Playgroud)

然后我将这个 DTO 传递给了PropertyExtractingDelegatingItemWriter编写器,在该类中我定义了两个自定义方法将实体写入数据库,请参阅下面的编写器代码:

@Configuration
public class LogWriter extends LogAbstract {
    @Autowired
    private DataSource dataSource;

    @Bean()
    public PropertyExtractingDelegatingItemWriter<DatabaseEntry> itemWriterAccessLogEntry() {
        PropertyExtractingDelegatingItemWriter<DatabaseEntry> propertyExtractingDelegatingItemWriter = new PropertyExtractingDelegatingItemWriter<DatabaseEntry>();
        propertyExtractingDelegatingItemWriter.setFieldsUsedAsTargetMethodArguments(new String[]{"accessLogEntry", "blockedIp"});
        propertyExtractingDelegatingItemWriter.setTargetObject(this);
        propertyExtractingDelegatingItemWriter.setTargetMethod("saveTransaction");
        return propertyExtractingDelegatingItemWriter;
    }

    public void saveTransaction(AccessLogEntry accessLogEntry, BlockedIp blockedIp) throws SQLException {
        writeAccessLogTable(accessLogEntry);
        if (blockedIp != null) {
            writeBlockedIp(blockedIp);
        }

    }

    private void writeBlockedIp(BlockedIp entry) throws SQLException {
        PreparedStatement statement = dataSource.getConnection().prepareStatement("INSERT INTO blocked_ips (ip,threshold,startDate,endDate,comment) VALUES (?,?,?,?,?)");
        statement.setString(1, entry.getIp());
        statement.setInt(2, threshold);
        statement.setTimestamp(3, Timestamp.valueOf(startDate));
        statement.setTimestamp(4, Timestamp.valueOf(endDate));
        statement.setString(5, entry.getComment());
        statement.execute();
    }

    private void writeAccessLogTable(AccessLogEntry entry) throws SQLException {
        PreparedStatement statement = dataSource.getConnection().prepareStatement("INSERT INTO log_entries (date,ip,request,status,userAgent) VALUES (?,?,?,?,?)");
        statement.setTimestamp(1, Timestamp.valueOf(entry.getDate()));
        statement.setString(2, entry.getIp());
        statement.setString(3, entry.getRequest());
        statement.setString(4, entry.getStatus());
        statement.setString(5, entry.getUserAgent());
        statement.execute();
    }
}
Run Code Online (Sandbox Code Playgroud)

使用这种方法,您可以从单个读取器获得所需的初始行为,以处理多个实体并在一个步骤中保存它们。