如何解释RDD.treeAggregate

bho*_*ass 11 scala distributed-computing apache-spark rdd

我在Apache Spark代码源中遇到了这一行

val (gradientSum, lossSum, miniBatchSize) = data
    .sample(false, miniBatchFraction, 42 + i)
    .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
      seqOp = (c, v) => {
        // c: (grad, loss, count), v: (label, features)
        val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
        (c._1, c._2 + l, c._3 + 1)
      },
      combOp = (c1, c2) => {
        // c: (grad, loss, count)
        (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
      }
    )
Run Code Online (Sandbox Code Playgroud)

我读这个有多个麻烦:

  • 首先,我在网上找不到任何可以解释确切treeAggregate工作方式的内容,这些内容的含义是什么.
  • 其次,这里.treeAggregate的方法名称似乎有两个()().这意味着什么?这是一些我不理解的特殊scala语法.
  • 最后,我看到seqOp和comboOp都返回一个3元素元组,它与预期的左侧变量匹配,但实际返回了哪一个?

这个陈述必须非常先进.我无法开始破译这一点.

maa*_*asg 16

treeAggregate是一种专门的实现,aggregate它将组合函数迭代地应用于分区的子集.这样做是为了防止将所有部分结果返回给驱动程序,其中单个传递减少将像经典aggregate那样发生.

出于所有实际目的,treeAggregate遵循aggregate本答案中解释的相同原则:解释Python中的聚合功能,但需要额外的参数来指示部分聚合级别的深度.

让我试着解释一下这里发生了什么:

对于聚合,我们需要零,组合器函数和reduce函数. aggregate使用currying来独立于combine和reduce函数指定零值.

然后我们可以像这样剖析上面的函数.希望这有助于理解:

val Zero: (BDV, Double, Long) = (BDV.zeros[Double](n), 0.0, 0L)
val combinerFunction: ((BDV, Double, Long), (??, ??)) => (BDV, Double, Long)  =  (c, v) => {
        // c: (grad, loss, count), v: (label, features)
        val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
        (c._1, c._2 + l, c._3 + 1)
val reducerFunction: ((BDV, Double, Long),(BDV, Double, Long)) => (BDV, Double, Long) = (c1, c2) => {
        // c: (grad, loss, count)
        (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
      }
Run Code Online (Sandbox Code Playgroud)

然后我们可以treeAggregate以更易于消化的形式重写调用:

val (gradientSum, lossSum, miniBatchSize) = treeAggregate(Zero)(combinerFunction, reducerFunction)
Run Code Online (Sandbox Code Playgroud)

此表单将"生成"生成的元组到命名值中gradientSum, lossSum, miniBatchSize以供进一步使用.

请注意,treeAggregate该参数depth使用默认值声明的附加参数depth = 2,因此,在此特定调用中未提供该参数,它将采用该默认值.