在 Golang 中使用 BigQuery Write API

Ale*_*int 5 go google-bigquery

我正在尝试使用新的Bigquery Storage API从 Golang 进行流式插入。根据此页面,我了解到该 API 取代了旧的流式插入 bigquery API。

但是,文档中的任何示例都没有显示如何实际插入行。为了创建 AppendRowsRequest,我得到了以下结果:

&storagepb.AppendRowsRequest{
    WriteStream: resp.Name,
    Rows: &storagepb.AppendRowsRequest_ProtoRows{
        ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
            WriterSchema: nil, // protobuf schema??
            Rows: &storagepb.ProtoRows{
                SerializedRows: [][]byte{}, // serialized protocol buffer data??
            },
        },
    },
}
Run Code Online (Sandbox Code Playgroud)

我应该将哪些数据放入上面的 SerializedRows 字段中?

上面的结构storagepb.ProtoRows记录在此处。不幸的是,给出的只是协议缓冲区主概述页面的链接。

谁能给我一个使用新的 Bigquery Storage API 将行从 Golang 流式传输到 bigquery 的示例吗?

Ale*_*int 5

在上面答案的帮助下,我得到了一个工作示例,该示例可以在 github 上找到: https: //github.com/alexflint/bigquery-storage-api-example

主要代码为:

const (
    project = "myproject"
    dataset = "mydataset"
    table   = "mytable"
    trace   = "bigquery-writeclient-example" // identifies this client for bigquery debugging
)

// the data we will stream to bigquery
var rows = []*Row{
    {Name: "John Doe", Age: 104},
    {Name: "Jane Doe", Age: 69},
    {Name: "Adam Smith", Age: 33},
}

func main() {
    ctx := context.Background()

    // create the bigquery client
    client, err := storage.NewBigQueryWriteClient(ctx)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // create the write stream
    // a COMMITTED write stream inserts data immediately into bigquery
    resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
        Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),
        WriteStream: &storagepb.WriteStream{
            Type: storagepb.WriteStream_COMMITTED,
        },
    })
    if err != nil {
        log.Fatal("CreateWriteStream: ", err)
    }

    // get the stream by calling AppendRows
    stream, err := client.AppendRows(ctx)
    if err != nil {
        log.Fatal("AppendRows: ", err)
    }

    // get the protobuf descriptor for our row type
    var row Row
    descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
    if err != nil {
        log.Fatal("NormalizeDescriptor: ", err)
    }

    // serialize the rows
    var opts proto.MarshalOptions
    var data [][]byte
    for _, row := range rows {
        buf, err := opts.Marshal(row)
        if err != nil {
            log.Fatal("protobuf.Marshal: ", err)
        }
        data = append(data, buf)
    }

    // send the rows to bigquery
    err = stream.Send(&storagepb.AppendRowsRequest{
        WriteStream: resp.Name,
        TraceId:     trace, // identifies this client
        Rows: &storagepb.AppendRowsRequest_ProtoRows{
            ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
                // protocol buffer schema
                WriterSchema: &storagepb.ProtoSchema{
                    ProtoDescriptor: descriptor,
                },
                // protocol buffer data
                Rows: &storagepb.ProtoRows{
                    SerializedRows: data, // serialized protocol buffer data
                },
            },
        },
    })
    if err != nil {
        log.Fatal("AppendRows.Send: ", err)
    }

    // get the response, which will tell us whether it worked
    _, err = stream.Recv()
    if err != nil {
        log.Fatal("AppendRows.Recv: ", err)
    }

    log.Println("done")
}
Run Code Online (Sandbox Code Playgroud)

上面“Row”结构体的协议缓冲区定义是:

syntax = "proto3";

package tutorial;

option go_package = ".;main";

message Row {
    string Name = 1;
    int32 Age = 2;
}
Run Code Online (Sandbox Code Playgroud)

您需要首先使用与协议缓冲区对应的模式创建一个bigquery数据集和表。有关如何执行此操作的信息,请参阅上面链接的存储库中的自述文件。

运行上面的代码后,数据显示在bigquery中,如下所示:

$ bq query 'select * from mydataset.mytable'
Waiting on bqjob_r1b39442e5474a885_0000017df21f629e_1 ... (0s) Current status: DONE   
+------------+-----+
|    name    | age |
+------------+-----+
| John Doe   | 104 |
| Jane Doe   |  69 |
| Adam Smith |  33 |
+------------+-----+
Run Code Online (Sandbox Code Playgroud)

感谢大家的帮助!