在同一个RDD上使用多个管道阻止更多IO

Itt*_*ayD 10 apache-spark

例如,如果我运行相同的RDD数字,其中一个流过滤偶数并平均它们,其他过滤器为奇数并对它们求和.如果我在同一个RDD上将其写为两个管道,这将创建两个执行,这将扫描RDD两次,这在IO方面可能很昂贵.

如何将此IO简化为只读取一次数据而不将逻辑重写为一个管道?当然,只要开发人员继续独立地处理每个管道(在实际情况下,这些管道从单独的模块加载),一个采用两个管道并将它们合并为一个的框架是可以的.

关键是不要使用cache()来实现这一点

zer*_*323 3

由于您的问题相当模糊,让我们考虑一下可用于解决此问题的一般策略。

这里的标准解决方案是缓存,但由于您明确希望避免它,所以我假设这里有一些额外的限制。它建议一些类似的解决方案,例如

也不可接受。这意味着您必须找到一些来操纵管道本身。

尽管可以将多个转换压缩在一起,但每个转换都会创建一个新的 RDD。这与您关于缓存的声明相结合,对可能的解决方案设置了相对严格的限制。

让我们从最简单的可能情况开始,其中所有管道都可以表示为单阶段作业。这限制了我们只能选择映射作业和简单的映射减少作业(就像您问题中描述的那样)。像这样的管道可以很容易地表示为本地迭代器上的一系列操作。所以下面的

import org.apache.spark.util.StatCounter

def isEven(x: Long) = x % 2 == 0
def isOdd(x: Long) = !isEven(x)

def p1(rdd: RDD[Long]) = {
  rdd
    .filter(isEven _)
    .aggregate(StatCounter())(_ merge _, _ merge _)
    .mean
}

def p2(rdd: RDD[Long]) = {
  rdd
    .filter(isOdd _)
    .reduce(_ + _)
}
Run Code Online (Sandbox Code Playgroud)

可以表示为:

def p1(rdd: RDD[Long]) = {
  rdd
    .mapPartitions(iter => 
      Iterator(iter.filter(isEven _).foldLeft(StatCounter())(_ merge _)))
    .collect
    .reduce(_ merge _)
    .mean
}


def p2(rdd: RDD[Long]) = {
  rdd
    .mapPartitions(iter => 
      Iterator(iter.filter(isOdd _).foldLeft(0L)(_ + _)))
    .collect
    .reduce(_ + _)
    // identity _
}
Run Code Online (Sandbox Code Playgroud)

此时我们可以将单独的作业重写如下:

def mapPartitions2[T, U, V](rdd: RDD[T])(f: Iterator[T] => U, g: Iterator[T] => V)  = {
  rdd.mapPartitions(iter => {
    val items = iter.toList
    Iterator((f(items.iterator), g(items.iterator)))
  })
}

def reduceLocally2[U, V](rdd: RDD[(U, V)])(f: (U, U) => U, g: (V, V) => V) = {
   rdd.collect.reduce((x, y) => (f(x._1, y._1), g(x._2, y._2)))
}

def evaluate[U, V, X, Z](pair: (U, V))(f: U => X, g: V => Z) = (f(pair._1), g(pair._2)) 

val rdd = sc.range(0L, 100L)

def f(iter: Iterator[Long]) = iter.filter(isEven _).foldLeft(StatCounter())(_ merge _)
def g(iter: Iterator[Long]) = iter.filter(isOdd _).foldLeft(0L)(_ + _)


evaluate(reduceLocally2(mapPartitions2(rdd)(f, g))(_ merge _, _ + _))(_.mean, identity)
Run Code Online (Sandbox Code Playgroud)

这里最大的问题是我们必须急切地评估每个分区才能应用单独的管道。这意味着与单独应用相同的逻辑相比,总体内存要求可能要高得多。如果没有缓存*,在多阶段作业的情况下它也是无用的。

另一种解决方案是按元素处理数据,但将每个项目视为 seq 元组:

def map2[T, U, V, X](rdd: RDD[(Seq[T], Seq[U])])(f: T => V, g: U => X) = {
  rdd.map{ case (ts, us) => (ts.map(f), us.map(g)) }
}


def filter2[T, U](rdd: RDD[(Seq[T], Seq[U])])(
    f: T => Boolean, g: U => Boolean) = {
  rdd.map{ case (ts, us) => (ts.filter(f), us.filter(g)) }
}


def aggregate2[T, U, V, X](rdd: RDD[(Seq[T], Seq[U])])(zt: V, zu: X)
    (s1: (V, T) => V, s2: (X, U) => X, m1: (V, V) => V, m2: (X, X) => X) = {
  rdd.mapPartitions(iter => {
    var accT = zt
    var accU = zu
    iter.foreach { case (ts, us) => {
      accT = ts.foldLeft(accT)(s1)
      accU = us.foldLeft(accU)(s2)
    }}

    Iterator((accT, accU))
  }).reduce { case ((v1, x1), (v2, x2)) => ((m1(v1, v2), m2(x1, x2))) }
}
Run Code Online (Sandbox Code Playgroud)

使用这样的 API,我们可以将初始管道表示为:

val rddSeq = rdd.map(x => (Seq(x), Seq(x)))


aggregate2(filter2(rddSeq)(isEven, isOdd))(StatCounter(), 0L)(
  _ merge _, _ + _, _ merge _, _ + _
)
Run Code Online (Sandbox Code Playgroud)

这种方法比前一种方法稍微强大一些(byKey如果需要,您可以轻松实现某些方法子集),并且典型管道中的内存要求应该与核心 API 相当,但它的侵入性也明显更大。


* 您可以查看eje提供的答案来了解复用示例。