小编Har*_*eem的帖子

Apache Beam中有状态处理的问题

所以我已经阅读了梁的状态处理及时处理文章,并发现了实现这些功能的问题.

我试图解决的问题与类似,为每一行生成一个顺序索引.因为我希望能够将数据流生成的行引用到原始源的行.

public static class createIndex extends DoFn<String, KV<String, String>> {
  @StateId("count")
  private final StateSpec<ValueState<Long>> countState = StateSpecs.value(VarLongCoder.of());

  @ProcessElement
  public void processElement(ProcessContext c, @StateId("count") ValueState<Long> countState)  {

    String val = c.element();
    long count = 0L;
    if(countState.read() != null)
      count = countState.read();

    count = count + 1;
    countState.write(count);

    c.output(KV.of(String.valueOf(count), val));

  }
}

Pipeline p = Pipeline.create(options);

p.apply(TextIO.read().from("gs://randomBucket/file.txt"))
 .apply(ParDo.of(new createIndex()));
Run Code Online (Sandbox Code Playgroud)

我按照我在网上找到的任何内容查看了ParDo的原始源代码,但不确定需要做什么.我得到的错误是:

java.lang.IllegalArgumentException: ParDo requires its input to use KvCoder in order to use state and timers. …
Run Code Online (Sandbox Code Playgroud)

java state google-cloud-dataflow apache-beam

5
推荐指数
1
解决办法
981
查看次数

标签 统计

apache-beam ×1

google-cloud-dataflow ×1

java ×1

state ×1