Spark 2.4.0 的 shuffle 块大小仍然有 2GB 限制?

Wan*_*Wei 4 java apache-spark

我知道 2GB 限制问题,但我已经将其设置spark.maxRemoteBlockSizeFetchToMem为小于 2GB,但我仍然看到 2GB 限制被触及:

19/03/30 06:48:43 INFO CoarseGrainedExecutorBackend: Got assigned task 2008
19/03/30 06:48:43 INFO Executor: Running task 195.0 in stage 4.0 (TID 2008)
19/03/30 06:48:43 INFO ShuffleBlockFetcherIterator: Getting 289 non-empty blocks including 68 local blocks and 221 remote blocks
19/03/30 06:48:43 INFO ShuffleBlockFetcherIterator: Started 3 remote fetches in 1 ms
19/03/30 06:48:43 INFO ShuffleBlockFetcherIterator: Getting 270 non-empty blocks including 67 local blocks and 203 remote blocks
19/03/30 06:48:43 INFO ShuffleBlockFetcherIterator: Started 3 remote fetches in 3 ms
19/03/30 06:48:43 INFO ObjectAggregationIterator: Aggregation hash map reaches threshold capacity (128 entries), spilling and falling back to sort based aggregation. You may change the threshold by adjust option spark.sql.objectHashAggregate.sortBased.fallbackThreshold
19/03/30 06:48:43 INFO ObjectAggregationIterator: Aggregation hash map reaches threshold capacity (128 entries), spilling and falling back to sort based aggregation. You may change the threshold by adjust option spark.sql.objectHashAggregate.sortBased.fallbackThreshold
19/03/30 06:48:43 INFO MemoryStore: Block rdd_23_160 stored as values in memory (estimated size 42.0 MB, free 10.2 GB)
19/03/30 06:48:43 INFO Executor: Finished task 160.0 in stage 4.0 (TID 1973). 2141 bytes result sent to driver
19/03/30 06:48:44 INFO MemoryStore: Block rdd_23_170 stored as values in memory (estimated size 49.9 MB, free 10.2 GB)
19/03/30 06:48:44 INFO Executor: Finished task 170.0 in stage 4.0 (TID 1983). 2141 bytes result sent to driver
19/03/30 06:48:44 INFO MemoryStore: Block rdd_23_148 stored as values in memory (estimated size 79.8 MB, free 10.2 GB)
19/03/30 06:48:44 INFO Executor: Finished task 148.0 in stage 4.0 (TID 1962). 2184 bytes result sent to driver
19/03/30 06:48:44 INFO MemoryStore: Block rdd_23_168 stored as values in memory (estimated size 46.9 MB, free 10.2 GB)
19/03/30 06:48:44 INFO Executor: Finished task 168.0 in stage 4.0 (TID 1981). 2141 bytes result sent to driver
19/03/30 06:48:44 INFO MemoryStore: Block rdd_23_179 stored as values in memory (estimated size 72.7 MB, free 10.2 GB)
19/03/30 06:48:44 INFO Executor: Finished task 179.0 in stage 4.0 (TID 1994). 2141 bytes result sent to driver
19/03/30 06:48:44 INFO MemoryStore: Block rdd_23_151 stored as values in memory (estimated size 59.9 MB, free 10.2 GB)
19/03/30 06:48:45 INFO Executor: Finished task 151.0 in stage 4.0 (TID 1964). 2141 bytes result sent to driver
19/03/30 06:48:45 INFO MemoryStore: Block rdd_23_177 stored as values in memory (estimated size 52.2 MB, free 10.3 GB)
19/03/30 06:48:45 INFO Executor: Finished task 177.0 in stage 4.0 (TID 1990). 2141 bytes result sent to driver
19/03/30 06:48:45 INFO MemoryStore: Block rdd_23_166 stored as values in memory (estimated size 140.4 MB, free 10.2 GB)
19/03/30 06:48:45 INFO Executor: Finished task 166.0 in stage 4.0 (TID 1979). 2141 bytes result sent to driver
19/03/30 06:48:45 INFO MemoryStore: Block rdd_23_183 stored as values in memory (estimated size 44.0 MB, free 10.3 GB)
19/03/30 06:48:45 INFO Executor: Finished task 183.0 in stage 4.0 (TID 1996). 2141 bytes result sent to driver
19/03/30 06:48:45 INFO MemoryStore: Block rdd_23_195 stored as values in memory (estimated size 47.0 MB, free 10.4 GB)
19/03/30 06:48:45 INFO Executor: Finished task 195.0 in stage 4.0 (TID 2008). 2141 bytes result sent to driver
19/03/30 06:48:45 INFO MemoryStore: Block rdd_23_192 stored as values in memory (estimated size 92.1 MB, free 10.3 GB)
19/03/30 06:48:45 INFO Executor: Finished task 192.0 in stage 4.0 (TID 2006). 2141 bytes result sent to driver
19/03/30 06:48:45 INFO MemoryStore: Block rdd_23_182 stored as values in memory (estimated size 93.4 MB, free 10.2 GB)
19/03/30 06:48:45 INFO Executor: Finished task 182.0 in stage 4.0 (TID 1995). 2141 bytes result sent to driver
19/03/30 06:48:45 INFO MemoryStore: Block rdd_23_171 stored as values in memory (estimated size 125.3 MB, free 10.2 GB)
19/03/30 06:48:45 INFO Executor: Finished task 171.0 in stage 4.0 (TID 1988). 2141 bytes result sent to driver
19/03/30 06:48:46 INFO ObjectAggregationIterator: Aggregation hash map reaches threshold capacity (128 entries), spilling and falling back to sort based aggregation. You may change the threshold by adjust option spark.sql.objectHashAggregate.sortBased.fallbackThreshold
19/03/30 06:49:02 WARN BlockManager: Putting block rdd_23_167 failed due to exception java.lang.IllegalArgumentException: Cannot grow BufferHolder by size 1504 because the size after growing exceeds size limitation 2147483632.
19/03/30 06:49:02 WARN BlockManager: Block rdd_23_167 could not be removed as it was not found on disk or in memory
19/03/30 06:49:02 ERROR Executor: Exception in task 167.0 in stage 4.0 (TID 1980)
java.lang.IllegalArgumentException: Cannot grow BufferHolder by size 1504 because the size after growing exceeds size limitation 2147483632
    at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:71)
    at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.grow(UnsafeWriter.java:62)
    at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.writeAlignedBytes(UnsafeWriter.java:175)
    at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:148)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:234)
    at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:223)
    at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:86)
    at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:33)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:298)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)

我在 Spark 2.3.0 上遇到了同样的问题,升级到 2.4.0 仍然一样。

我确实有一个大约 100GB 的数据集,而且我的数据有些倾斜,而且我正在做一个涉及随机读取等的大型聚合。有人有一些提示或想法吗?

vaq*_*han 5

由于以下原因,您收到错误

1) 没有足够的分区,因此在加载数据后需要 repartition() 以便将数据(通过 shuffle)分区到集群中的其他节点。这将为您提供更快处理所需的并行性。

2) 由于分区键选择不当而导致数据倾斜。未倾斜数据源的平均块大小为

  • (总数据大小)/(#mappers)/(#reducers),通常是大约的除数

  • 100*100至1000*1000

因此,我们通常会看到单个块大小的单位为 KB 或 MB。

LArray库可以处理大于2GB的数据,这是Java字节数组和映射文件的限制:

在这里您可以找到好的链接: