MoS*_*She 5 google-cloud-storage google-bigquery google-cloud-platform google-cloud-pubsub google-cloud-dataflow
我有一个Google bigQuery表,我想将整个表流式传输到pub-sub主题中
简便的方法应该是什么?
先感谢您,
2019更新:
现在,使用发布/订阅中的click-to-bigquery选项确实很容易:
Find it on: https://console.cloud.google.com/cloudpubsub/topicList
The easiest way I know of is going through Google Cloud Dataflow, which natively knows how to access BigQuery and Pub/Sub.
In theory it should be as easy as the following Python lines:
p = beam.Pipeline(options=pipeline_options)
tablerows = p | 'read' >> beam.io.Read(
beam.io.BigQuerySource('clouddataflow-readonly:samples.weather_stations'))
tablerows | 'write' >> beam.io.Write(
beam.io.PubSubSink('projects/fh-dataflow/topics/bq2pubsub-topic'))
Run Code Online (Sandbox Code Playgroud)
This combination of Python/Dataflow/BigQuery/PubSub doesn't work today (Python Dataflow is in beta, but keep an eye on the changelog).
We can do the same with Java, and it works well - I just tested it. It runs either locally, and also in the hosted Dataflow runner:
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<TableRow> weatherData = p.apply(
BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations"));
weatherData.apply(ParDo.named("tableRow2string").of(new DoFn<TableRow, String>() {
@Override
public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception {
c.output(c.element().toString());
}
})).apply(PubsubIO.Write.named("WriteToPubsub").topic("projects/myproject/topics/bq2pubsub-topic"));
p.run();
Run Code Online (Sandbox Code Playgroud)
Test if the messages are there with:
gcloud --project myproject beta pubsub subscriptions pull --auto-ack sub1
Run Code Online (Sandbox Code Playgroud)
Hosted Dataflow screenshot:
这实际上取决于桌子的大小。
如果它是一个小表(几千条记录,几个打瞌睡列),那么您可以设置一个过程来查询整个表,将响应转换为 JSON 数组,然后推送到 pub-sub。
如果它是一个大表(数百万/数十亿条记录,数百列),您必须导出到文件,然后准备/发送到 pub-sub
它还取决于您的分区策略 - 如果您的表设置为按日期分区,您可能能够再次查询而不是导出。
最后但并非最不重要的一点是,它还取决于频率 - 这是一次性交易(然后导出)还是连续过程(然后使用表装饰器仅查询最新数据)?
如果您想要真正有用的答案,则需要更多信息。
编辑
根据您对表格大小的评论,我认为最好的方法是编写一个脚本:
将表导出到GCS作为换行符分隔的 JSON
处理文件(逐行读取)并发送到 pub-sub
大多数编程语言都有客户端库。我用 Python 做过类似的事情,而且相当简单。
| 归档时间: |
|
| 查看次数: |
2033 次 |
| 最近记录: |