GCP Dataflow Apache Beam 写入输出错误处理

Ric*_*rdB 2 google-cloud-platform google-cloud-dataflow apache-beam

我需要对数据流应用错误处理,以便使用相同的主键多次插入 Spanner。逻辑是,在当前消息之后可能会收到较旧的消息,并且我不想覆盖保存的值。因此,我将创建我的突变作为插入,并在尝试重复插入时抛出错误。

我在 DoFn 中看到过几个尝试块的示例,它们写入侧面输出以记录任何错误。这是一个非常好的解决方案,但我需要对写入不包含 DoFn 的 Spanner 的步骤应用错误处理

spannerBranchTuples2.get(spannerOutput2)
    .apply("Create Spanner Mutation", ParDo.of(createSpannerMutation))                      
    .apply("Write Spanner Records", SpannerIO.write()
        .withInstanceId(options.getSpannerInstanceId())                  
        .withDatabaseId(options.getSpannerDatabaseId())
        .grouped());
Run Code Online (Sandbox Code Playgroud)

我还没有找到任何允许将错误处理应用于此步骤的文档,或者找到一种将其重写为 DoFn 的方法。有什么建议如何对此应用错误处理吗?谢谢

Pab*_*blo 5

数据流文档中有一个有趣的模式。

\n

DoFn基本上,这个想法是在将结果发送到您的写作转换之前进行。它看起来像这样:

\n
    final TupleTag<Output> successTag = new TupleTag<>() {};\n    final TupleTag<Input> deadLetterTag = new TupleTag<>() {};\n    PCollection<Input> input = /* \xe2\x80\xa6 */;\n    PCollectionTuple outputTuple = input.apply(ParDo.of(new DoFn<Input, Output>() {\n      @Override\n      void processElement(ProcessContext c) {\n      try {\n        c.output(process(c.element());\n      } catch (Exception e) {\n        LOG.severe("Failed to process input {} -- adding to dead letter file",\n          c.element(), e);\n        c.sideOutput(deadLetterTag, c.element());\n      }\n    }).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));\n    \n    outputTuple.get(deadLetterTag)\n      .apply(/* Write to a file or table or anything */);\n\n    outputTuple.get(successTag)\n      .apply(/* Write to Spanner or any other sink */);\n
Run Code Online (Sandbox Code Playgroud)\n

让我知道这是否有用!

\n