Har*_*eem 5 java state google-cloud-dataflow apache-beam
所以我已经阅读了梁的状态处理和及时处理文章,并发现了实现这些功能的问题.
我试图解决的问题与此类似,为每一行生成一个顺序索引.因为我希望能够将数据流生成的行引用到原始源的行.
public static class createIndex extends DoFn<String, KV<String, String>> {
@StateId("count")
private final StateSpec<ValueState<Long>> countState = StateSpecs.value(VarLongCoder.of());
@ProcessElement
public void processElement(ProcessContext c, @StateId("count") ValueState<Long> countState) {
String val = c.element();
long count = 0L;
if(countState.read() != null)
count = countState.read();
count = count + 1;
countState.write(count);
c.output(KV.of(String.valueOf(count), val));
}
}
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("gs://randomBucket/file.txt"))
.apply(ParDo.of(new createIndex()));
Run Code Online (Sandbox Code Playgroud)
我按照我在网上找到的任何内容查看了ParDo的原始源代码,但不确定需要做什么.我得到的错误是:
java.lang.IllegalArgumentException: ParDo requires its input to use KvCoder in order to use state and timers.
Run Code Online (Sandbox Code Playgroud)
我意识到这是一个简单的问题,但由于缺乏足够的示例或文档,我无法解决问题.我很感激任何帮助.谢谢!
好的,所以我继续研究这个问题并阅读一些源代码,并且能够解决问题.事实证明,输入ParDo.of(new DoFn())需要输入为a KV<T,U>.因此,为了读取文件并为每一行创建索引,我需要通过Key Value Pair对象传递它.下面我添加了代码:
public static class FakeKvPair extends DoFn<String, KV<String, String>> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of("", c.element()));
}
}
Run Code Online (Sandbox Code Playgroud)
并将管道更改为:
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("gs://randomBucket/file.txt"))
.apply(ParDo.of(new FakeKvPair()))
.apply(ParDo.of(new createIndex()));
Run Code Online (Sandbox Code Playgroud)
出现的新问题是,在我的计数中是否保留了行的顺序,因为我正在运行额外的ParDo函数,这可能会改变被馈送到的行的顺序createIndex().
在我的本地机器上保留订单,但我不知道它将如何扩展到Dataflow.但我会问这是一个不同的问题.
| 归档时间: |
|
| 查看次数: |
981 次 |
| 最近记录: |