Spring批处理聚合值并写入单个值

Pra*_*lar 6 java spring spring-batch

我正在使用 spring 批处理,我需要实现以下目标

  1. 读取包含日期和金额等详细信息的 csv 文件
  2. 汇总同一日期所有金额的总和
  3. 保留一个带有日期和总和的条目

我过去使用过批处理,我想到了以下方法。用 2 个步骤创建一个批次。

第1步:

  1. Reader:使用 FlatFileItemReader 遍历整个文件
  2. 处理器:使用键作为日期和值作为数量填充地图。如果存在条目,则获取该值并将其添加到新值中
  3. 作家:没有操作作家,因为我不想写

第2步:

  1. 阅读器:遍历地图的值
  2. 作家:坚持价值观

我能够实现我填充Map. 这Map已声明为@JobScope

我被困在如何为 step2 创建读取器,它只需要读取值列表。我试过了,ListItemReader但我无法MapListItemReader.

请提出解决方案,或者您是否有更好的方法来解决这个问题

谢谢

Han*_*ier 5

选项 1:如果您的简历已按日期排序,您可以实现一个组读取器,该组读取器会读取行,直到键值发生更改。之后,整个组可以作为一项传递给处理器。

这样的小组阅读器可能如下所示:

  private SingleItemPeekableItemReader<I> reader;
  private ItemReader<I> peekReaderDelegate;

  @Override
  public void afterPropertiesSet() throws Exception {
    Assert.notNull(peekReaderDelegate, "The 'itemReader' may not be null");
    this.reader= new SingleItemPeekableItemReader<I>();
    this.reader.setDelegate(peekReaderDelegate);
  }

  @Override
  // GroupDTO is just a simple container. It is also possible to use
  // List<I> instead of GroupDTO<I>
  public GroupDTO<I> read() throws Exception {
    State state = State.NEW; // a simple enum with the states NEW, READING, and COMPLETE
    GroupDTO<I> group = null;
    I item = null;

    while (state != State.COMPLETE) {
      item = reader.read();

      switch (state) {
        case NEW: {
          if (item == null) {
            // end reached
            state = State.COMPLETE;
            break;
          }

          group = new GroupDTO<I>();
          group.addItem(item);
          state = State.READING;
          I nextItem = reader.peek();
          // isGroupBreak returns true, if 'item' and 'nextItem' do NOT belong to the same group
          if (nextItem == null || getGroupBreakStrategy.isGroupBreak(item, nextItem)) {
            state = State.COMPLETE;
          }
          break;
        }
        case READING: {
          group.addItem(item);

          // peek and check if there the peeked entry has a new date
          I nextItem = peekEntry();
          // isGroupBreak returns true, if 'item' and 'nextItem' do NOT belong to the same group
          if (nextItem == null || getGroupBreakStrategy.isGroupBreak(item, nextItem)) {
            state = State.COMPLETE;
          }
          break;
        }
        default: {
          throw new org.springframework.expression.ParseException(groupCounter, "ParsingError: Reader is in an invalid state");
        }
      }
    }

    return group;
  }
Run Code Online (Sandbox Code Playgroud)

您需要一个 SingleItemPeekableItemReader,以便预读取下一个元素。这一篇涵盖了您的普通读者。

选项 2:第一步按照您的建议进行,但只需为步骤 2 编写一个 tasklet。无需使用读取器-进程-编写器方法,而是可以使用一个简单的 tasklet 将映射的内容写入文件。

选项 3:如果您确实想在步骤 2 中使用读取器-处理器-编写器方法,请编写您自己的读取器来迭代您的映射。

类似的东西(我没有测试该代码):

public class MapReader implements ItemReader {

     private MapContainer container;
     private Iterator<Map.Entry<Date, Integer> mapIterator;

     @PostConstruct
     public void afterPropertiesSet() {
        Assert.notNull(container);
        iterator = container.getMap().entry().iterator;
     }

     public void setMapContainer(MapContainer container) {
         this.container = container;
     }

     public Map.Entry<Date, Integer> read() {
        if (iterator.hasNext()) {
           return iterator.next();
        }
        return null;
      }
}

@Component
public class MapContainer {
    private Map<Date, Integer> data = new Hashmap<>();

    public Map<Date, Integer> getMap() {
        return data;
    }

    // add modifier method as needed for step 1

}
Run Code Online (Sandbox Code Playgroud)

因此,您为容器创建一个 spring-bean 实例,将其注入到步骤 2 的处理器中,在那里填充,同时将其注入到上面的阅读器中。