在BigQuery接收器中仅执行一次处理的情况下,改组是什么意思?

Mas*_*syB 5 dataflow google-bigquery apache-beam

我正在阅读一篇有关由某些Dataflow源和接收器实施的一次精确处理的文章,但在理解BigQuery接收器上的示例时遇到了麻烦。从文章

生成随机UUID是非确定性操作,因此在插入BigQuery之前,我们必须添加一个重新排列。完成此操作后,Cloud Dataflow进行的任何重试将始终使用改组后的相同UUID。重复插入BigQuery的尝试将始终具有相同的插入ID,因此BigQuery可以对其进行过滤

// Apply a unique identifier to each record
c
 .apply(new DoFn<> {
  @ProcessElement
  public void processElement(ProcessContext context) {
   String uniqueId = UUID.randomUUID().toString();
   context.output(KV.of(ThreadLocalRandom.current().nextInt(0, 50),
                                     new RecordWithId(context.element(), uniqueId)));
 }
})
// Reshuffle the data so that the applied identifiers are stable and will not change.
.apply(Reshuffle.of<Integer, RecordWithId>of())
// Stream records into BigQuery with unique ids for deduplication.
.apply(ParDo.of(new DoFn<..> {
   @ProcessElement
   public void processElement(ProcessContext context) {
     insertIntoBigQuery(context.element().record(), context.element.id());
   }
 });
Run Code Online (Sandbox Code Playgroud)

改组是什么意思,它如何防止在后续重试中为同一插入生成不同的UUID?

Mic*_*tin 5

Reshuffle 以不同的方式对数据进行分组。然而,这里使用它是因为它的副作用:检查点和重复数据删除。

如果不进行重新洗牌,如果同一任务生成 UUID 并将数据插入 BigQuery,则存在工作线程重新启动的风险,新工作线程将生成新的 UUID 并向 BigQuery 发送不同的行,从而导致重复行。

Reshuffle操作将UUID生成和BigQuery插入分为两个步骤,并在它们之间插入检查点和重复数据删除。

  1. 首先,生成UUID并发送到reshuffle。如果重新启动 UUID 生成工作程序,则没有问题,因为重新洗牌会删除重复行,从而消除失败/重新启动的工作程序中的数据。
  2. 生成的 UUID 通过 shuffle 操作进行检查点。
  3. BigQuery 插入工作线程使用检查点 UUID,因此即使重新启动,它也会向 BigQuery 发送完全相同的数据。
  4. BigQuery 使用这些 UUID 对数据进行重复数据删除,因此重新启动的插入工作线程产生的重复数据将在 BigQuery 中消除。


Fel*_*ffa 2

我认为这篇文章很好地解释了为什么“重新洗牌”有助于从“至少一次”转变为“恰好一次”:

\n
\n

具体来说,窗口可能会尝试使用元素 e0、e1、e2 触发,但工作线程在提交窗口处理之前崩溃(但不会在这些元素作为副作用发送之前崩溃)。当工作进程重新启动时,窗口将再次触发,但现在会显示一个迟到的元素 e3。由于该元素在提交窗口之前出现,因此它\xe2\x80\x99不被算作延迟数据,因此使用元素e0、e1、e2、e3再次调用DoFn。然后将它们发送到副作用操作。幂等性在这里没有帮助,因为每次发送不同的逻辑记录集。

\n

还有其他方法可以引入非决定论。解决这一风险的标准方法是依靠这样一个事实:Cloud Dataflow 当前保证只有一个版本的 DoFn 输出可以使其通过洗牌边界这一事实。

\n
\n

您还可以查看 Reshuffle 的文档:

\n\n

其中有一条关于弃用此类的注释,因此BigQueryIO的未来实现可能会有所不同。

\n