Dan*_*eck 4 google-cloud-platform google-cloud-dataflow
让我用一个稍微做作的例子来解释我想要做什么。想象一下,我有一系列交易,包括股票代码、股票数量和价格:{ symbol = "GOOG", count = 30, price = 200 }。我想用股票的名称来丰富这些事件,在这种情况下"Google"。
为此,我想在 Dataflow 内部维护一个由 a 更新的符号->名称映射“表” PCollection<KV<String, String>>,并将我的交易流与该表连接起来,产生例如 a PCollection<KV<Trade, String>>。
这似乎是流处理应用程序的一个非常基础的用例,但我很难弄清楚如何在 Dataflow 中实现这一点。我知道在 Kafka Streams 中这是可能的。
请注意,我不希望使用外部数据库的查询-我需要解决内部数据流这个问题,或者切换到卡夫卡流。
我将描述两个选项。一个使用侧输入,它应该与当前版本的 Dataflow (1.X) 一起使用,另一个使用 aDoFn中的状态,它应该是即将到来的 Dataflow (2.X) 的一部分。
这里的一般想法是使用映射值的侧输入使符号->名称映射对所有工作人员可用。
这个表需要在全局窗口中(所以没有任何东西会过时),需要每个元素都被触发(或者你想要产生新更新的频率),并在所有触发中累积元素。还需要一些逻辑来为每个符号取最新的名称。
该解决方案的缺点是每次有新条目进入时都会重新生成整个查找表,并且不会立即将其推送给所有工作人员。相反,每个人都将在未来“某个时候”获得新的映射。
在较高级别,此管道可能类似于(我尚未测试此代码,因此可能存在某些类型):
PCollection<KV<Symbol, Name>> symbolToNameInput = ...;
final PCollectionView<Map<Symbol, Iterable<Name>>> symbolToNames = symbolToNameInput
.apply(Window.into(GlobalWindows.of())
.triggering(Repeatedly.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(5)))
.accumulatingFiredPanes())
.apply(View.asMultiMap())
Run Code Online (Sandbox Code Playgroud)
请注意,我们必须在viewAsMultiMap这里使用。这意味着我们实际上为每个符号建立了所有名称。当我们查找内容时,我们需要确保在可迭代对象中使用最新的名称。
PCollection<Detail> symbolDetails = ...;
symbolDetails
.apply(ParDo.withSideInputs(symbolToNames).of(new DoFn<Detail, AugmentedDetails>() {
@Override
public void processElement(ProcessContext c) {
Iterable<Name> names = c.sideInput(symbolToNames).get(c.element().symbol());
Name name = chooseName(names);
c.output(augmentDetails(c.element(), name));
}
}));
Run Code Online (Sandbox Code Playgroud)
此解决方案使用了一项新功能,该功能将成为即将发布的 Dataflow 2.0 版本的一部分。它尚未包含在预览版本(当前为 Dataflow 2.0-beta1)中,但您可以查看发行说明以了解它何时可用。
一般的想法是键控状态允许我们存储一些与特定键相关联的值。在这种情况下,我们将记住我们看到的最新“名称”值。
在运行 stateful 之前,DoFn我们要将每个元素包装到一个公共元素类型 (a NameOrDetails) 对象中。这将类似于以下内容:
// Convert SymbolToName entries to KV<Symbol, NameOrDetails>
PCollection<KV<Symbol, NameOrDetails>> left = symbolToName
.apply(ParDo.of(new DoFn<SymbolToName, KV<Symbol, NameOrDetails>>() {
@ProcessElement
public void processElement(ProcessContext c) {
SymbolToName e = c.element();
c.output(KV.of(e.getSymbol(), NameOrDetails.name(e.getName())));
}
});
// Convert detailed entries to KV<Symbol, NameOrDetails>
PCollection<KV<Symbol, NameOrDetails>> right = details
.apply(ParDo.of(new DoFn<Details, KV<Symbol, NameOrDetails>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Details e = c.element();
c.output(KV.of(e.getSymobl(), NameOrDetails.details(e)));
}
});
// Flatten the two streams together
PCollectionList.of(left).and(right)
.apply(Flatten.create())
.apply(ParDo.of(new DoFn<KV<Symbol, NameOrDetails>, AugmentedDetails>() {
@StateId("name")
private final StateSpec<ValueState<String>> nameSpec =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
public void processElement(ProcessContext c
@StateId("name") ValueState<String> nameState) {
NameOrValue e = c.element().getValue();
if (e.isName()) {
nameState.write(e.getName());
} else {
String name = nameState.read();
if (name == null) {
// Use symbol if we haven't received a mapping yet.
name = c.element().getKey();
}
c.output(e.getDetails().withName(name));
}
});
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
472 次 |
| 最近记录: |