后期数据处理 | 阿帕奇梁

Sun*_*nil 2 apache-beam

已经错过了窗口和晚期数据.withAllowedLateness周期从管道作为记录下车这里

我对这种行为有几个问题:

  1. 如何处理从管道中丢失的后期数据?我们可以添加默认行为吗?说所有迟到的数据都应该记录在像包罗万象的桶之类的地方?
  2. 我们能否有一个指标(Google Dataflow Metrics/Beam)来说明这些消息中有多少由于巨大的延迟而从管道中丢失?

Gui*_*ins 5

  1. 一般来说,我们将延迟数据定义为元素,当它们到达时,我们只是更愿意删除它们并且不想进一步处理。据我所知,添加额外的功能来处理这些消息需要大量的努力来修改 Java SDK。但是,如果您只想记录它们,这是由LateDataDroppingDoFnRunner代码完成的,它负责从过期窗口中删除数据:
for (WindowedValue<InputT> input : concatElements) {
  BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
  if (canDropDueToExpiredWindow(window)) {
    // The element is too late for this window.
    droppedDueToLateness.inc();
    WindowTracing.debug(
        "{}: Dropping element at {} for key:{}; window:{} "
            + "since too far behind inputWatermark:{}; outputWatermark:{}",
        LateDataFilter.class.getSimpleName(),
        input.getTimestamp(),
        key,
        window,
        timerInternals.currentInputWatermarkTime(),
        timerInternals.currentOutputWatermarkTime());
  }
}
Run Code Online (Sandbox Code Playgroud)

请注意,日志具有DEBUG级别,因此您可能看不到它。正如解释这里,覆盖在数据流的水平,就可以使用--defaultWorkerLogLevel=DEBUG,或者甚至更好,指定特定的类如--workerLogLevelOverrides={"org.apache.beam.sdk.util.WindowTracing":"DEBUG"}。明智地选择您的密钥有助于公开信息以识别丢弃的消息(即数据沿袭)。

  1. 正如在前面的片段中可以看到的,droppedDueToLateness是一个 Counter 指标,每次我们删除一个元素时它都会增加:droppedDueToLateness.inc();。您可以使用 Stackdriver 和资源类型dataflow_job和指标来监控它custom.googleapis.com/dataflow/droppedDueToLateness

在此处输入图片说明