我需要对数据流应用错误处理,以便使用相同的主键多次插入 Spanner。逻辑是,在当前消息之后可能会收到较旧的消息,并且我不想覆盖保存的值。因此,我将创建我的突变作为插入,并在尝试重复插入时抛出错误。
我在 DoFn 中看到过几个尝试块的示例,它们写入侧面输出以记录任何错误。这是一个非常好的解决方案,但我需要对写入不包含 DoFn 的 Spanner 的步骤应用错误处理
spannerBranchTuples2.get(spannerOutput2)
.apply("Create Spanner Mutation", ParDo.of(createSpannerMutation))
.apply("Write Spanner Records", SpannerIO.write()
.withInstanceId(options.getSpannerInstanceId())
.withDatabaseId(options.getSpannerDatabaseId())
.grouped());
Run Code Online (Sandbox Code Playgroud)
我还没有找到任何允许将错误处理应用于此步骤的文档,或者找到一种将其重写为 DoFn 的方法。有什么建议如何对此应用错误处理吗?谢谢