减少之前如何避免大的中间结果?

use*_*956 2 mapreduce apache-spark rdd

我在火花工作中遇到一个令我惊讶的错误:

 Total size of serialized results of 102 tasks (1029.6 MB) is
 bigger than spark.driver.maxResultSize (1024.0 MB)
Run Code Online (Sandbox Code Playgroud)

我的工作是这样的:

def add(a,b): return a+b
sums = rdd.mapPartitions(func).reduce(add)
Run Code Online (Sandbox Code Playgroud)

rdd有~500个分区,func获取该分区中的行并返回一个大数组(一个1.3M双精度或~10Mb的numpy数组).我想总结所有这些结果并返回它们的总和.

Spark似乎在内存中保存了mapPartitions(func)的总结果(大约5gb),而不是以递增方式处理它,这需要大约30Mb.

而不是增加spark.driver.maxResultSize,有没有办法逐步执行reduce?


更新:实际上我有点惊讶的是,更多的两个结果都记在内存中.

use*_*411 5

这里没有什么特别令人惊讶的.使用reduceSpark时,会对驱动程序进行最终减少.如果func返回单个对象,则实际上相当于:

reduce(add, rdd.collect())
Run Code Online (Sandbox Code Playgroud)

你可以使用treeReduce:

import math

# Keep maximum possible depth
rdd.treeReduce(add, depth=math.log2(rdd.getNumPartitions()))
Run Code Online (Sandbox Code Playgroud)

或者toLocalIterator:

sum(rdd.toLocalIterator())
Run Code Online (Sandbox Code Playgroud)

前者将以增加的网络交换为代价递归地合并工作者的分区.您可以使用depth参数调整性能.

后者只会在当时收集一个分区,但可能需要重新评估,rdd并且作业的重要部分将由驱动程序执行.

根据func您使用的确切逻辑,您还可以通过将矩阵拆分为块并逐块执行添加来改善工作分布,例如使用BlockMatrices