针对 Dataflow 中的“表”加入流

Dan*_*eck 4 google-cloud-platform google-cloud-dataflow

让我用一个稍微做作的例子来解释我想要做什么。想象一下,我有一系列交易,包括股票代码、股票数量和价格:{ symbol = "GOOG", count = 30, price = 200 }。我想用股票的名称来丰富这些事件,在这种情况下"Google"

为此,我想在 Dataflow 内部维护一个由 a 更新的符号->名称映射“表” PCollection<KV<String, String>>,并将我的交易流与该表连接起来,产生例如 a PCollection<KV<Trade, String>>

这似乎是流处理应用程序的一个非常基础的用例,但我很难弄清楚如何在 Dataflow 中实现这一点。我知道在 Kafka Streams 中这是可能的。

请注意,我希望使用外部数据库的查询-我需要解决内部数据流这个问题,或者切换到卡夫卡流。

Ben*_*ers 5

我将描述两个选项。一个使用侧输入,它应该与当前版本的 Dataflow (1.X) 一起使用,另一个使用 aDoFn中的状态,它应该是即将到来的 Dataflow (2.X) 的一部分。

Dataflow 1.X 的解决方案,使用侧输入

这里的一般想法是使用映射值的侧输入使符号->名称映射对所有工作人员可用。

这个表需要在全局窗口中(所以没有任何东西会过时),需要每个元素都被触发(或者你想要产生新更新的频率),并在所有触发中累积元素。还需要一些逻辑来为每个符号取最新的名称。

该解决方案的缺点是每次有新条目进入时都会重新生成整个查找表,并且不会立即将其推送给所有工作人员。相反,每个人都将在未来“某个时候”获得新的映射。

在较高级别,此管道可能类似于(我尚未测试此代码,因此可能存在某些类型):

PCollection<KV<Symbol, Name>> symbolToNameInput = ...;
final PCollectionView<Map<Symbol, Iterable<Name>>> symbolToNames = symbolToNameInput
  .apply(Window.into(GlobalWindows.of())
      .triggering(Repeatedly.forever(AfterProcessingTime
          .pastFirstElementInPane()
          .plusDelayOf(Duration.standardMinutes(5)))
      .accumulatingFiredPanes())
  .apply(View.asMultiMap())
Run Code Online (Sandbox Code Playgroud)

请注意,我们必须在viewAsMultiMap这里使用。这意味着我们实际上为每个符号建立了所有名称。当我们查找内容时,我们需要确保在可迭代对象中使用最新的名称。

PCollection<Detail> symbolDetails = ...;
symbolDetails
  .apply(ParDo.withSideInputs(symbolToNames).of(new DoFn<Detail, AugmentedDetails>() {
    @Override
    public void processElement(ProcessContext c) {
      Iterable<Name> names = c.sideInput(symbolToNames).get(c.element().symbol());
      Name name = chooseName(names);
      c.output(augmentDetails(c.element(), name)); 
    }
  }));
Run Code Online (Sandbox Code Playgroud)

Dataflow 2.X 的解决方案,使用 State API

此解决方案使用了一项新功能,该功能将成为即将发布的 Dataflow 2.0 版本的一部分。它尚未包含在预览版本(当前为 Dataflow 2.0-beta1)中,但您可以查看发行说明以了解它何时可用。

一般的想法是键控状态允许我们存储一些与特定键相关联的值。在这种情况下,我们将记住我们看到的最新“名称”值。

在运行 stateful 之前,DoFn我们要将每个元素包装到一个公共元素类型 (a NameOrDetails) 对象中。这将类似于以下内容:

// Convert SymbolToName entries to KV<Symbol, NameOrDetails>
PCollection<KV<Symbol, NameOrDetails>> left = symbolToName
  .apply(ParDo.of(new DoFn<SymbolToName, KV<Symbol, NameOrDetails>>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
      SymbolToName e = c.element();
      c.output(KV.of(e.getSymbol(), NameOrDetails.name(e.getName())));
    }
  });

// Convert detailed entries to KV<Symbol, NameOrDetails>
PCollection<KV<Symbol, NameOrDetails>> right = details
  .apply(ParDo.of(new DoFn<Details, KV<Symbol, NameOrDetails>>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
      Details e = c.element();
      c.output(KV.of(e.getSymobl(), NameOrDetails.details(e)));
    }
});

// Flatten the two streams together
PCollectionList.of(left).and(right)
  .apply(Flatten.create())
  .apply(ParDo.of(new DoFn<KV<Symbol, NameOrDetails>, AugmentedDetails>() {
    @StateId("name")
    private final StateSpec<ValueState<String>> nameSpec = 
      StateSpecs.value(StringUtf8Coder.of());

    @ProcessElement
    public void processElement(ProcessContext c
      @StateId("name") ValueState<String> nameState) {
      NameOrValue e = c.element().getValue();
      if (e.isName()) {
        nameState.write(e.getName());
      } else {
      String name = nameState.read();
      if (name == null) {
        // Use symbol if we haven't received a mapping yet.
        name = c.element().getKey();
      }
     c.output(e.getDetails().withName(name));
    }
  });
Run Code Online (Sandbox Code Playgroud)