小编hmm*_*rst的帖子

如何捕获 BigQueryIO.Write 抛出的任何异常并挽救输出失败的数据?

我想从 Cloud Pub/Sub 读取数据并使用 Cloud Dataflow 将其写入 BigQuery。每个数据都包含一个表 ID,数据本身将保存在其中。

写入 BigQuery 失败的因素有多种:

  • 表 ID 格式错误。
  • 数据集不存在。
  • 数据集不允许管道访问。
  • 网络故障。

当其中一个失败发生时,流式作业将重试该任务并停止。我尝试使用WriteResult.getFailedInserts()以挽救坏数据并避免停顿,但效果不佳。有什么好办法吗?

这是我的代码:

public class StarterPipeline {
  private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

  public class MyData implements Serializable {
    String table_id;
  }

  public interface MyOptions extends PipelineOptions {
    @Description("PubSub topic to read from, specified as projects/<project_id>/topics/<topic_id>")
    @Validation.Required
    ValueProvider<String> getInputTopic();
    void setInputTopic(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

    Pipeline p = Pipeline.create(options);

    PCollection<MyData> input = …
Run Code Online (Sandbox Code Playgroud)

google-bigquery google-cloud-dataflow apache-beam

5
推荐指数
1
解决办法
1505
查看次数