如何使用 Dataflow 将 Google Pub/Sub 中的数据批量处理到 Cloud Storage?

Eka*_*sal 0 dataflow batch-processing google-cloud-storage google-bigquery google-cloud-pubsub

我正在构建一个从 MYSQL 数据库读取数据并在 BigQuery 中创建副本的变更数据捕获管道。我将在 Pub/Sub 中推送更改并使用 Dataflow 将它们传输到 Google Cloud Storage。我已经能够弄清楚如何流式传输更改,但是我需要对数据库中的几个表运行批处理。

在从 Pub/Sub 等无限源读取时,能否使用 Dataflow 运行批处理作业?我可以运行此批处理作业以将数据从 Pub/Sub 传输到 Cloud Storage,然后将此数据加载到 BigQuery 吗?我想要一个批处理作业,因为流作业成本更高。

gui*_*ere 6

谢谢你的精确。

首先,当您在 Dataflow(Beam 框架)中使用 PubSub 时,只能在流模式下使用

Cloud Pub/Sub 源和接收器目前仅在远程执行期间的流式管道中受支持。

如果您的流程不需要实时,您可以跳过 Dataflow 并节省资金。您可以将 Cloud Functions 或 Cloud Run 用于我建议您的流程(如果您愿意,也可以使用 App Engine,但不是我的第一个建议)。

在这两种情况下,创建一个由Cloud Scheduler定期(每周?)触发的流程(Cloud Run 或 Cloud Function)。

解决方案1

  • 将您的流程连接到请求订阅
  • 每次读取消息(或消息块,例如 1000)时,将流写入 BigQuery。-> 但是,流写入在 big Query 上不是免费的(每 Gb 0.05 美元)
  • 循环直到队列为空。将超时设置为最大值(使用 Cloud Function 为 9 分钟,使用 Cloud Run 为 15 分钟)以防止出现任何超时问题

解决方案2

  • 将您的流程连接到请求订阅
  • 读取一大块消息(例如 1000)并将它们写入内存(写入数组)。
  • 循环直到队列为空。将超时设置为最大值(使用 Cloud Function 为 9 分钟,使用 Cloud Run 为 15 分钟)以防止出现任何超时问题。还将内存设置为最大值 (2Gb) 以防止内存不足崩溃。
  • 从您的内存数据数组创建一个加载作业到 BigQuery。-> 此处加载作业是免费的,并且您每天和每个表最多只能加载 1000 个加载作业。

但是,如果您的应用程序 + 数据大小大于 ma 内存值,则此解决方案可能会失败。另一种方法是每隔一百万行(取决于每行的大小和内存占用)在 GCS 中创建一个文件。使用唯一前缀命名文件,例如当天的日期 (YYYYMMDD-tempFileXX),并在每个文件创建时增加 XX。然后,不是从内存中的数据创建加载作业,而是使用 GCS 中的数据,文件名中带有通配符 ( gs://myBucket/YYYYMMDD-tempFile*)。这样所有与前缀匹配的文件都将被加载。

建议PubSub 消息在 pubsub 订阅中最多保留 7 天。我建议您至少每 3 天触发一次流程,以便在将消息删除到订阅中之前有时间做出反应和调试。

个人经验写入 BigQuery 的流对于少量数据来说很便宜。对于一些美分,我建议您考虑第一个解决方案,您可以为此付费。管理和代码更小/更容易!