根据Apache Beam 2.0.0 SDK 文档 GroupIntoBatches仅适用于KV集合。
我的数据集只包含值,不需要引入键。然而,为了使用GroupIntoBatches我必须用一个空字符串作为键来实现“假”键:
static class FakeKVFn extends DoFn<String, KV<String, String>> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of("", c.element()));
}
}
Run Code Online (Sandbox Code Playgroud)
所以整体管道如下所示:
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
long batchSize = 100L;
p.apply("ReadLines", TextIO.read().from("./input.txt"))
.apply("FakeKV", ParDo.of(new FakeKVFn()))
.apply(GroupIntoBatches.<String, String>ofSize(batchSize))
.setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(StringUtf8Coder.of())))
.apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(callWebService(c.element().getValue()));
}
}))
.apply("WriteResults", TextIO.write().to("./output/"));
p.run().waitUntilFinish();
}
Run Code Online (Sandbox Code Playgroud)
有没有办法在不引入“假”密钥的情况下分组?
需要提供 KV 输入,GroupIntoBatches因为转换是使用状态和计时器实现的,它们是每个键和窗口的。
对于每个键+窗口对,状态和计时器必须串行执行(或可观察到)。您必须通过提供键(和窗口,尽管我知道今天没有在窗口上并行化的运行器)来手动表达可用的并行性。两种最常见的方法是:
GroupIntoBatches实际有用的数据。在您的代码段中向所有元素添加一个虚拟键将导致转换根本无法并行执行。这类似于有状态索引导致 ParDo 在 Dataflow Runner 上单线程运行的讨论。
| 归档时间: |
|
| 查看次数: |
2314 次 |
| 最近记录: |