通过Google Cloud Dataflow创建/写入Parititoned BigQuery表

ptf*_*ptf 8 google-bigquery google-cloud-dataflow apache-beam-io

我想利用时间分区表的新BigQuery功能,但不确定这在1.6版本的Dataflow SDK中是否可行.

查看BigQuery JSON API,要创建一个分区表,需要传入一个

"timePartitioning": { "type": "DAY" }
Run Code Online (Sandbox Code Playgroud)

选项,但com.google.cloud.dataflow.sdk.io.BigQueryIO接口仅允许指定TableReference.

我想也许我可以预先创建表,并通过BigQueryIO.Write.toTableReference lambda潜入分区装饰器..?是否有其他人通过Dataflow创建/编写分区表成功?

这似乎与设置当前不可用的表到期时间类似.

Dan*_*rin 8

正如Pavan所说,绝对可以使用Dataflow写入分区表.您是DataflowPipelineRunner在流模式还是批处理模式下使用?

你提出的解决方案应该有效.具体来说,如果您预先创建一个设置了日期分区的表,那么您可以使用BigQueryIO.Write.toTableReferencelambda写入日期分区.例如:

/**
 * A Joda-time formatter that prints a date in format like {@code "20160101"}.
 * Threadsafe.
 */
private static final DateTimeFormatter FORMATTER =
    DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC);

// This code generates a valid BigQuery partition name:
Instant instant = Instant.now(); // any Joda instant in a reasonable time range
String baseTableName = "project:dataset.table"; // a valid BigQuery table name
String partitionName =
    String.format("%s$%s", baseTableName, FORMATTER.print(instant));
Run Code Online (Sandbox Code Playgroud)

  • 这种方法非常好,但它只允许使用管道外的参数来控制日期戳.如果我们想要使用数据本身的时间戳来按日期拆分然后写入相应的表,该怎么办? (3认同)
  • @nembleton:如果元素有时间戳,你可以使用窗口将它们映射到日常窗口.修改此代码:`PCollection <Integer> windowedItems = items.apply(Window.<Integer> into(FixedWindows.of(Duration.standardMinutes(10))));`.然后,读取窗口的TableSpecFun会将元素映射到正确的日期.代码来自[FixedWindows javadoc](https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/windowing/FixedWindows) (3认同)
  • @ JulianV.Modesto是对的,如果提供了表引用,1.6 SDK会切换到以流模式写入BigQuery ..这还不允许表装饰器 (2认同)

Evg*_*ich 7

我采取的方法(也在流模式下工作):

  • 为传入记录定义自定义窗口
  • 将窗口转换为表/分区名称

    p.apply(PubsubIO.Read
                .subscription(subscription)
                .withCoder(TableRowJsonCoder.of())
            )
            .apply(Window.into(new TablePartitionWindowFn()) )
            .apply(BigQueryIO.Write
                           .to(new DayPartitionFunc(dataset, table))
                           .withSchema(schema)
                           .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            );
    
    Run Code Online (Sandbox Code Playgroud)

根据传入数据设置窗口,可以忽略End Instant,因为起始值用于设置分区:

public class TablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {

private IntervalWindow assignWindow(AssignContext context) {
    TableRow source = (TableRow) context.element();
    String dttm_str = (String) source.get("DTTM");

    DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC();

    Instant start_point = Instant.parse(dttm_str,formatter);
    Instant end_point = start_point.withDurationAdded(1000, 1);

    return new IntervalWindow(start_point, end_point);
};

@Override
public Coder<IntervalWindow> windowCoder() {
    return IntervalWindow.getCoder();
}

@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
    return Arrays.asList(assignWindow(c));
}

@Override
public boolean isCompatible(WindowFn<?, ?> other) {
    return false;
}

@Override
public IntervalWindow getSideInputWindow(BoundedWindow window) {
    if (window instanceof GlobalWindow) {
        throw new IllegalArgumentException(
                "Attempted to get side input window for GlobalWindow from non-global WindowFn");
    }
    return null;
}
Run Code Online (Sandbox Code Playgroud)

动态设置表分区:

public class DayPartitionFunc implements SerializableFunction<BoundedWindow, String> {

String destination = "";

public DayPartitionFunc(String dataset, String table) {
    this.destination = dataset + "." + table+ "$";
}

@Override
public String apply(BoundedWindow boundedWindow) {
    // The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
    String dayString = DateTimeFormat.forPattern("yyyyMMdd")
                                     .withZone(DateTimeZone.UTC)
                                     .print(((IntervalWindow) boundedWindow).start());
    return destination + dayString;
}}
Run Code Online (Sandbox Code Playgroud)

有没有更好的方法来实现相同的结果?