将BigQuery表流式传输到Google Pub / Sub

MoS*_*She 5 google-cloud-storage google-bigquery google-cloud-platform google-cloud-pubsub google-cloud-dataflow

我有一个Google bigQuery表,我想将整个表流式传输到pub-sub主题中

简便的方法应该是什么?

先感谢您,

Fel*_*ffa 8

2019更新:

现在,使用发布/订阅中的click-to-bigquery选项确实很容易:

在此处输入图片说明

Find it on: https://console.cloud.google.com/cloudpubsub/topicList


The easiest way I know of is going through Google Cloud Dataflow, which natively knows how to access BigQuery and Pub/Sub.

In theory it should be as easy as the following Python lines:

p = beam.Pipeline(options=pipeline_options)
tablerows = p | 'read' >> beam.io.Read(
  beam.io.BigQuerySource('clouddataflow-readonly:samples.weather_stations'))
tablerows | 'write' >> beam.io.Write(
  beam.io.PubSubSink('projects/fh-dataflow/topics/bq2pubsub-topic'))
Run Code Online (Sandbox Code Playgroud)

This combination of Python/Dataflow/BigQuery/PubSub doesn't work today (Python Dataflow is in beta, but keep an eye on the changelog).

We can do the same with Java, and it works well - I just tested it. It runs either locally, and also in the hosted Dataflow runner:

Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

PCollection<TableRow> weatherData = p.apply(
        BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations"));
weatherData.apply(ParDo.named("tableRow2string").of(new DoFn<TableRow, String>() {
    @Override
    public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception {
        c.output(c.element().toString());
    }
})).apply(PubsubIO.Write.named("WriteToPubsub").topic("projects/myproject/topics/bq2pubsub-topic"));

p.run();
Run Code Online (Sandbox Code Playgroud)

Test if the messages are there with:

gcloud --project myproject beta pubsub subscriptions  pull --auto-ack sub1
Run Code Online (Sandbox Code Playgroud)

Hosted Dataflow screenshot:

托管数据流在工作


Gil*_*rim 3

这实际上取决于桌子的大小。

如果它是一个小表(几千条记录,几个打瞌睡列),那么您可以设置一个过程来查询整个表,将响应转换为 JSON 数组,然后推送到 pub-sub。

如果它是一个大表(数百万/数十亿条记录,数百列),您必须导出到文件,然后准备/发送到 pub-sub

它还取决于您的分区策略 - 如果您的表设置为按日期分区,您可能能够再次查询而不是导出。

最后但并非最不重要的一点是,它还取决于频率 - 这是一次性交易(然后导出)还是连续过程(然后使用表装饰器仅查询最新数据)?

如果您想要真正有用的答案,则需要更多信息。

编辑

根据您对表格大小的评论,我认为最好的方法是编写一个脚本:

  1. 将表导出到GCS作为换行符分隔的 JSON

  2. 处理文件(逐行读取)并发送到 pub-sub

大多数编程语言都有客户端库。我用 Python 做过类似的事情,而且相当简单。