ben*_*ben 5 google-bigquery google-cloud-platform google-cloud-dataflow apache-beam
我的管道:Kafka - > Dataflow streaming(Beam v2.3) - > BigQuery
鉴于在我的情况下低延迟并不重要,我使用FILE_LOADS来降低成本,如下所示:
BigQueryIO.writeTableRows()
.withJsonSchema(schema)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withMethod(Method.FILE_LOADS)
.withTriggeringFrequency(triggeringFrequency)
.withCustomGcsTempLocation(gcsTempLocation)
.withNumFileShards(numFileShards)
.withoutValidation()
.to(new SerializableFunction[ValueInSingleWindow[TableRow], TableDestination]() {
def apply(element: ValueInSingleWindow[TableRow]): TableDestination = {
...
}
}
Run Code Online (Sandbox Code Playgroud)
这个数据流步骤在管道中引入了一个总是更大的延迟,因此它无法跟上Kafka吞吐量(小于50k事件/秒),即使有40 n1-standard-s4名工作人员.如下面的屏幕截图所示,此步骤的系统滞后非常大(接近管道正常运行时间),而Kafka系统滞后只有几秒钟.
如果我理解正确,Dataflow会将元素写入gcsTempLocation中的numFileShards,并且每个triggeringFrequency都会启动一个加载作业,将它们插入到BigQuery中.例如,如果我选择5分钟的触发bq ls -a -j频率,我可以看到(有)所有负载作业需要不到1分钟才能完成.但仍然是这一步骤引入了越来越多的延迟,导致Kafka消耗越来越少的元素(由于bcackpressure).增加/减少numFileShards和triggeringFrequency并不能解决问题.
我没有手动指定任何窗口,我只是默认窗口.文件不会在gcsTempLocation中累积.
知道这里出了什么问题吗?
您提到您没有显式指定窗口,这意味着默认情况下数据流将使用“全局窗口”。窗口文档包含此警告:
注意:Dataflow 的默认窗口行为是将 PCollection 的所有元素分配给单个全局窗口,即使对于无界 PCollection 也是如此。在无界 PCollection 上使用 GroupByKey 等分组转换之前,必须设置非全局窗口函数。请参阅设置 PCollection 的窗口功能。
如果您没有为无界 PCollection 设置非全局窗口函数,并随后使用 GroupByKey 或合并等分组转换,则您的管道将在构建时生成错误,并且您的 Dataflow 作业将失败。
您也可以为 PCollection 设置非默认触发器,以允许全局窗口在某些其他条件下发出“早期”结果。
您的管道似乎没有进行任何显式分组,但我想知道通过 BigQuery 写入进行的内部分组是否会导致问题。
您可以在 UI 中看到您的下游是否DropInputs收到了任何元素吗?如果不是,则表明数据在上游 BigQuery 步骤中受到阻碍。
| 归档时间: |
|
| 查看次数: |
895 次 |
| 最近记录: |