使用 Apache Beam 按键处理事件的总排序

mar*_*ark 5 google-cloud-dataflow apache-beam

问题背景

我试图从实时流中生成每个键的事件项的总(线性)顺序,其中顺序是事件时间(从事件有效负载派生)。

方法

我曾尝试使用流来实现这一点,如下所示:

1) 设置不重叠的顺序窗口,例如持续时间 5 分钟

2)建立一个允许的迟到——丢弃迟到的事件是可以的

3) 设置累积模式以保留所有已触发的窗格

4) 使用“AfterwaterMark”触发器

5) 处理触发窗格时,仅考虑最后一个窗格

6) 使用 GroupBy.perKey 确保此窗口中此键的所有事件将作为单个资源上的一个单元进行处理

虽然这种方法确保给定窗口内每个键的线性顺序,但它并不能保证跨多个窗口,例如,可能有一个键的事件窗口,发生在与较早窗口同时处理之后,如果第一个窗口失败并且必须重试,这很容易发生。

我正在考虑采用这种方法,首先可以处理实时流,以便它按键对事件进行分区,并将它们写入以窗口范围命名的文件中。由于光束处理的并行特性,这些文件也会乱序生成。然后,单个流程协调器可以将这些文件按顺序提交到批处理管道 - 只有在收到前一个文件并且其下游处理已成功完成时才提交下一个文件。

问题是 Apache Beam 只会在该时间窗口中至少有一个时间元素时触发窗格。因此,如果事件中存在间隙,则生成的文件中可能存在间隙 - 即丢失的文件。丢失文件的问题在于,协调批处理器无法区分时间窗口是否已经过去而没有数据,或者是否出现故障,在这种情况下,直到文件最终到达它才能继续进行。

强制事件窗口触发的一种方法可能是以某种方式将虚拟事件添加到每个分区和时间窗口的流中。然而,这很难做到……如果时间序列中有很大的差距,那么如果这些虚拟事件发生在很晚的事件周围,那么它们将被视为迟到而被丢弃。

是否有其他方法可以确保每个可能的事件窗口都有触发器,即使这会导致输出空文件?

从实时流中按键生成总排序是 Apache Beam 的一个容易处理的问题吗?我应该考虑另一种方法吗?

Ken*_*les 8

根据您对易处理的定义,当然可以在 Apache Beam 中按事件时间戳对每个键的流进行完全排序。

以下是设计背后的考虑因素:

  1. Apache Beam 不保证按顺序传输,因此在管道内没有用。因此,我假设您正在执行此操作,以便您可以写入外部系统,并且只有在它们出现时才能处理事物。
  2. 如果一个事件有时间戳t,你永远不能确定没有更早的事件会到达,除非你等到t是可丢弃的。

所以这是我们将如何做到的:

  1. 我们将在全局窗口中编写一个ParDo使用状态和计时器的程序(博客文章仍在审核中)。这使其成为每个键的工作流程。
  2. 我们将在元素到达时缓冲它们的状态。因此,您允许的延迟会影响您需要的数据结构的效率。你需要的是一个堆来查看和弹出最小时间戳和元素;没有内置的堆状态,所以我将它写为ValueState.
  3. 我们将设置一个事件时间计时器以在元素的时间戳不再矛盾时接收回调。

EventHeap为简洁起见,我将假设一个自定义数据结构。在实践中,您希望将其分解为多个状态单元以最小化传输的数据。堆可能是对原始状态类型的合理补充。

我还将假设我们需要的所有编码人员都已经注册并专注于状态和计时器逻辑。

new DoFn<KV<K, Event>, Void>() {

  @StateId("heap")
  private final StateSpec<ValueState<EventHeap>> heapSpec = StateSpecs.value();

  @TimerId("next")
  private final TimerSpec nextTimerSpec = TimerSpec.timer(TimeDomain.EVENT_TIME);

  @ProcessElement
  public void process(
      ProcessContext ctx,
      @StateId("heap") ValueState<EventHeap> heapState,
      @TimerId("next") Timer nextTimer) {
    EventHeap heap = firstNonNull(
      heapState.read(),
      EventHeap.createForKey(ctx.element().getKey()));
    heap.add(ctx.element().getValue());
    // When the watermark reaches this time, no more elements
    // can show up that have earlier timestamps
    nextTimer.set(heap.nextTimestamp().plus(allowedLateness);
  }

  @OnTimer("next")
  public void onNextTimestamp(
      OnTimerContext ctx,
      @StateId("heap") ValueState<EventHeap> heapState,
      @TimerId("next") Timer nextTimer) {
    EventHeap heap = heapState.read();
    // If the timer at time t was delivered the watermark must
    // be strictly greater than t
    while (!heap.nextTimestamp().isAfter(ctx.timestamp())) {
      writeToExternalSystem(heap.pop());
    }
    nextTimer.set(heap.nextTimestamp().plus(allowedLateness);
  }
}
Run Code Online (Sandbox Code Playgroud)

这应该可以让您开始走向任何您的底层用例。