我想从 Cloud Pub/Sub 读取数据并使用 Cloud Dataflow 将其写入 BigQuery。每个数据都包含一个表 ID,数据本身将保存在其中。
写入 BigQuery 失败的因素有多种:
当其中一个失败发生时,流式作业将重试该任务并停止。我尝试使用WriteResult.getFailedInserts()以挽救坏数据并避免停顿,但效果不佳。有什么好办法吗?
这是我的代码:
public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
public class MyData implements Serializable {
String table_id;
}
public interface MyOptions extends PipelineOptions {
@Description("PubSub topic to read from, specified as projects/<project_id>/topics/<topic_id>")
@Validation.Required
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
}
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<MyData> input = …Run Code Online (Sandbox Code Playgroud)