Sae*_*ham 3 google-cloud-dataflow apache-beam spotify-scio
我想加载流数据,然后添加一个键,然后按键计数。
当我尝试使用流方法(无界数据)加载和按键分组大数据时,Apache Beam Dataflow 管道出现内存错误。因为似乎数据是按分组累积的,并且不会在触发每个窗口时更早地触发数据。
如果我减小元素大小(元素数量不会改变),它会起作用!因为实际上 group-by step 等待所有数据被分组,然后触发所有新的窗口数据。
我对两者进行了测试:
梁版本 2.11.0 和 scio 版本 0.7.4
梁版本 2.6.0 和 scio 版本 0.6.1
如您所见,数据是按组累积的,不会发出。
val windowedData = data.applyKvTransform(
Window.into[myt](
Sessions.withGapDuration(Duration.millis(1)))
.triggering(
Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(10),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1)))
).orFinally(AfterWatermark.pastEndOfWindow())
).withAllowedLateness(Duration.standardSeconds(100))
.discardingFiredPanes()
)
Run Code Online (Sandbox Code Playgroud)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException: Commit request for stage S2 and key 2 is larger than 2GB and cannot be processed. This may be caused by grouping a very large amount of data in a single window without using Combine, or by producing a large amount of data from a single input element.
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException.causedBy(StreamingDataflowWorker.java:230)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:146)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1008)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
是否有任何解决方案可以通过强制 group-by 发出每个窗口的早期结果来解决内存问题。
小智 5
KeyCommitTooLargeException 不是内存问题,而是 protobuf 序列化问题。Protobuf 的对象限制为 2GB(google protobuf 最大大小)。Dataflow 发现管道中单个键的值大于 2GB,因此无法对数据进行 shuffle。错误消息表明“这可能是由于在没有使用组合的情况下在单个窗口中对大量数据进行分组,或者从单个输入元素生成大量数据造成的。” 根据您的管道设置(即分配的随机密钥),后者更有可能。
管道可能从 GCS 读取了一个大文件 (>2GB) 并将其分配给一个随机密钥。GroupByKey 需要密钥 shuffle 操作,而 Dataflow 由于 protobuf 限制而无法执行,因此卡在该密钥上并保留水印。
如果单个键的值较大,您可能希望减小值的大小,例如,压缩字符串,或将字符串拆分为多个键,或者首先生成较小的 GCS 文件。
如果较大的值来自多个键的分组,您可能希望增加键空间,以便按键操作的每个组最终将较少的键组合在一起。
| 归档时间: |
|
| 查看次数: |
1840 次 |
| 最近记录: |