use*_*956 2 mapreduce apache-spark rdd
我在火花工作中遇到一个令我惊讶的错误:
Total size of serialized results of 102 tasks (1029.6 MB) is
bigger than spark.driver.maxResultSize (1024.0 MB)
Run Code Online (Sandbox Code Playgroud)
我的工作是这样的:
def add(a,b): return a+b
sums = rdd.mapPartitions(func).reduce(add)
Run Code Online (Sandbox Code Playgroud)
rdd有~500个分区,func获取该分区中的行并返回一个大数组(一个1.3M双精度或~10Mb的numpy数组).我想总结所有这些结果并返回它们的总和.
Spark似乎在内存中保存了mapPartitions(func)的总结果(大约5gb),而不是以递增方式处理它,这需要大约30Mb.
而不是增加spark.driver.maxResultSize,有没有办法逐步执行reduce?
更新:实际上我有点惊讶的是,更多的两个结果都记在内存中.
这里没有什么特别令人惊讶的.使用reduceSpark时,会对驱动程序进行最终减少.如果func返回单个对象,则实际上相当于:
reduce(add, rdd.collect())
Run Code Online (Sandbox Code Playgroud)
你可以使用treeReduce:
import math
# Keep maximum possible depth
rdd.treeReduce(add, depth=math.log2(rdd.getNumPartitions()))
Run Code Online (Sandbox Code Playgroud)
sum(rdd.toLocalIterator())
Run Code Online (Sandbox Code Playgroud)
前者将以增加的网络交换为代价递归地合并工作者的分区.您可以使用depth参数调整性能.
后者只会在当时收集一个分区,但可能需要重新评估,rdd并且作业的重要部分将由驱动程序执行.
根据func您使用的确切逻辑,您还可以通过将矩阵拆分为块并逐块执行添加来改善工作分布,例如使用BlockMatrices
| 归档时间: |
|
| 查看次数: |
333 次 |
| 最近记录: |