Hen*_*o12 5 google-bigquery google-cloud-pubsub google-cloud-dataflow apache-beam
我已经从Google 的 github 存储库中提取了Pub/Sub 到 BigQuery Dataflow 模板的副本。我使用direct-runner在我的本地机器上运行它。
在测试中,我确认模板仅在 UDF 处理或从 JSON 到 TableRow 转换期间发生错误时才将失败写入“死信”表。
我还希望通过将它们发送到单独的 TupleTag 来更优雅地处理在插入 BigQuery 时发生的故障,以便它们也可以发送到死信表或其他输出进行审查和处理。目前,当使用dataflow-runner执行时,这些错误只会写入 Stackdriver 日志,并会继续无限期地重试,直到问题得到解决。
问题一:在本地测试并发布格式与目标表模式不匹配的消息时,插入被重试 5 次,然后管道崩溃,出现 RuntimeException 以及从 HTTP 响应返回到 Google API 的错误。我相信这种行为是在BigQueryServices.Impl中设置的 :
private static final FluentBackoff INSERT_BACKOFF_FACTORY =
FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);
Run Code Online (Sandbox Code Playgroud)
但是,根据Google 的文档,
“在流模式下运行时,包含失败项目的包将无限期重试,这可能会导致您的管道永久停止。”
作为 Beam 的Pub/Sub.IO,
创建和使用无界 PCollections
我的印象是从 Pub/Sub 读取时应该默认启用流模式。我什至在对 writeTableRows() 的调用中添加了 Streaming_Inserts 方法,但它并没有影响这种行为。
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withMethod(Method.STREAMING_INSERTS)
Run Code Online (Sandbox Code Playgroud)
问题二:
我问是因为我不知道如何在不创建自己的静态类的情况下捕获与插入相关的错误,该类覆盖了 expand 方法并使用 ParDo 和 DoFn,我可以在其中添加自己的自定义逻辑来为成功记录和失败记录创建单独的 TupleTags ,类似于在JavascriptTextTransformer 中为 FailsafeJavascriptUdf完成此操作的方式。
更新:
public static PipelineResult run(DirectOptions options) {
options.setRunner(DirectRunner.class);
Pipeline pipeline = Pipeline.create(options);
// Register the coder for pipeline
FailsafeElementCoder<PubsubMessage, String> coder =
FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);
PCollectionTuple transformOut =
pipeline
//Step #1: Read messages in from Pub/Sub
.apply(
"ReadPubsubMessages",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))
//Step #2: Transform the PubsubMessages into TableRows
.apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
WriteResult writeResult = null;
try {
writeResult =
transformOut
.get(TRANSFORM_OUT)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withMethod(Method.STREAMING_INSERTS)
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to("myproject:MyDataSet.MyTable"));
} catch (Exception e) {
System.out.print("Cause of the Standard Insert Failure is: ");
System.out.print(e.getCause());
}
try {
writeResult
.getFailedInserts()
.apply(
"WriteFailedInsertsToDeadLetter",
BigQueryIO.writeTableRows()
.to(options.getOutputDeadletterTable())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
} catch (Exception e) {
System.out.print("Cause of the Error Insert Failure is: ");
System.out.print(e.getCause());
}
PCollectionList.of(transformOut.get(UDF_DEADLETTER_OUT))
.and(transformOut.get(TRANSFORM_DEADLETTER_OUT))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteFailedRecords",
WritePubsubMessageErrors.newBuilder()
.setErrorRecordsTable(
maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(getDeadletterTableSchemaJson())
.build());
return pipeline.run();
}
Run Code Online (Sandbox Code Playgroud)
错误:
Cause of the Error Insert Failure is: null[WARNING]
java.lang.NullPointerException: Outputs for non-root node WriteFailedInsertsToDeadLetter are null
at org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:864)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:672)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:575)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at com.google.cloud.teleport.templates.PubSubToBigQuery.run(PubSubToBigQuery.java:312)
at com.google.cloud.teleport.templates.PubSubToBigQuery.main(PubSubToBigQuery.java:186)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)
在最新版本的 Beam 中,BigQueryIO.Write转换返回一个WriteResult对象,该对象使您能够检索输出到 BigQuery 失败的 TableRows 的 PCollection。使用它,您可以轻松检索失败,在死信输出的结构中对其进行格式化,然后将记录重新提交到 BigQuery。这消除了对单独的类来管理成功和失败记录的需要。
以下是您的管道可能是什么样子的示例。
// Attempt to write the table rows to the output table.
WriteResult writeResult =
pipeline.apply(
"WriteRecordsToBigQuery",
BigQueryIO.writeTableRows()
.to(options.getOutputTable())
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
/*
* 1) Get the failed inserts
* 2) Transform to the deadletter table format.
* 3) Output to the deadletter table.
*/
writeResult
.getFailedInserts()
.apply("FormatFailedInserts", ParDo.of(new FailedInsertFormatter()))
.apply(
"WriteFailedInsertsToDeadletter",
BigQueryIO.writeTableRows()
.to(options.getDeadletterTable())
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
Run Code Online (Sandbox Code Playgroud)
此外,回答您的问题:
streaming
选项true。| 归档时间: |
|
| 查看次数: |
3014 次 |
| 最近记录: |