使用 GlobalWindow 在 Beam 中进行状态垃圾收集

pea*_*eay 3 google-cloud-dataflow apache-flink apache-beam

Apache Beam 最近通过注释引入了状态单元,并在Apache Flink 和 Google Cloud Dataflow 中提供了部分支持。StateSpec@StateId

我找不到任何有关将其与GlobalWindow. 特别是,有没有一种方法可以拥有“状态垃圾收集”机制,以根据某些配置摆脱一段时间未见过的键的状态,同时仍然保持已见过的键的单个历史状态足够频繁吗?

或者,在这种情况下使用的状态量是否会出现偏差,无法回收与一段时间未见过的键相对应的状态?

我还对 Apache Flink 或 Google Cloud Dataflow 是否支持潜在的解决方案感兴趣。

Flink 和 direct runner 似乎有一些“状态 GC”的代码,但我不太确定它的作用以及它在使用全局窗口时是否相关。

Ken*_*les 5

状态可以在窗口过期后的某个时刻由 Beam 运行程序自动进行垃圾收集 - 当输入水印超出窗口末尾允许的延迟时,因此所有进一步的输入都是可删除的。确切的细节取决于跑步者。

正如您正确确定的那样,全局窗口可能永远不会过期。那么这个状态的自动收集就不会被调用。对于有界数据,包括耗尽场景,它实际上会过期,但对于永久无界数据源,它不会。

如果您在全局窗口中对此类数据进行状态处理,则可以使用用户定义的计时器(通过@TimerId@OnTimerTimerSpec- 我还没有在博客中介绍这些)在您选择的超时后清除状态。如果状态表示某种聚合,那么您无论如何都需要一个计时器来确保您的数据不会滞留在状态中。

这是它们的使用的一个简单示例:

new DoFn<Foo, Baz>() {

  private static final String MY_TIMER = "my-timer";
  private static final String MY_STATE = "my-state";

  @StateId(MY_STATE)
  private final StateSpec<ValueState<Bizzle>> =
      StateSpec.value(Bizzle.coder());

  @TimerId(MY_TIMER)
  private final TimerSpec myTimer =
      TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @ProcessElement
  public void process(
      ProcessContext c,
      @StateId(MY_STATE) ValueState<Bizzle> bizzleState,
      @TimerId(MY_TIMER) Timer myTimer) {
    bizzleState.write(...);
    myTimer.setForNowPlus(...);
  }

  @OnTimer(MY_TIMER)
  public void onMyTimer(
      OnTimerContext context,
      @StateId(MY_STATE) ValueState<Bizzle> bizzleState) {
    context.output(... bizzleState.read() ...);
    bizzleState.clear();
  }
}
Run Code Online (Sandbox Code Playgroud)