Ric*_*rdB 2 google-cloud-platform google-cloud-dataflow apache-beam
我需要对数据流应用错误处理,以便使用相同的主键多次插入 Spanner。逻辑是,在当前消息之后可能会收到较旧的消息,并且我不想覆盖保存的值。因此,我将创建我的突变作为插入,并在尝试重复插入时抛出错误。
我在 DoFn 中看到过几个尝试块的示例,它们写入侧面输出以记录任何错误。这是一个非常好的解决方案,但我需要对写入不包含 DoFn 的 Spanner 的步骤应用错误处理
spannerBranchTuples2.get(spannerOutput2)
.apply("Create Spanner Mutation", ParDo.of(createSpannerMutation))
.apply("Write Spanner Records", SpannerIO.write()
.withInstanceId(options.getSpannerInstanceId())
.withDatabaseId(options.getSpannerDatabaseId())
.grouped());
Run Code Online (Sandbox Code Playgroud)
我还没有找到任何允许将错误处理应用于此步骤的文档,或者找到一种将其重写为 DoFn 的方法。有什么建议如何对此应用错误处理吗?谢谢
数据流文档中有一个有趣的模式。
\nDoFn基本上,这个想法是在将结果发送到您的写作转换之前进行。它看起来像这样:
final TupleTag<Output> successTag = new TupleTag<>() {};\n final TupleTag<Input> deadLetterTag = new TupleTag<>() {};\n PCollection<Input> input = /* \xe2\x80\xa6 */;\n PCollectionTuple outputTuple = input.apply(ParDo.of(new DoFn<Input, Output>() {\n @Override\n void processElement(ProcessContext c) {\n try {\n c.output(process(c.element());\n } catch (Exception e) {\n LOG.severe("Failed to process input {} -- adding to dead letter file",\n c.element(), e);\n c.sideOutput(deadLetterTag, c.element());\n }\n }).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));\n \n outputTuple.get(deadLetterTag)\n .apply(/* Write to a file or table or anything */);\n\n outputTuple.get(successTag)\n .apply(/* Write to Spanner or any other sink */);\nRun Code Online (Sandbox Code Playgroud)\n让我知道这是否有用!
\n| 归档时间: |
|
| 查看次数: |
2692 次 |
| 最近记录: |