在Spark数据集中滚动自己的reduceByKey

Car*_*cas 22 scala mapreduce apache-spark

我正在尝试学习除了RDD之外还要使用DataFrames和DataSet.对于RDD,我知道我可以做someRDD.reduceByKey((x,y) => x + y),但我没有看到数据集的功能.所以我决定写一个.

someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => {
  val result = mutable.HashMap.empty[(Long,Long),Int]
  val keys = mutable.HashSet.empty[(Long,Long)]
  y.keys.foreach(z => keys += z)
  x.keys.foreach(z => keys += z)
  for (elem <- keys) {
    val s1 = if(x.contains(elem)) x(elem) else 0
    val s2 = if(y.contains(elem)) y(elem) else 0
    result(elem) = s1 + s2
  }
  result
})
Run Code Online (Sandbox Code Playgroud)

但是,这会将所有内容返回给驱动程序.你怎么写这个回来Dataset?也许mapPartition并在那里做?

注意此编译但不运行,因为它没有对编码器Map

blu*_*e10 29

我假设你的目标是将这个成语翻译成数据集:

rdd.map(x => (x.someKey, x.someField))
   .reduceByKey(_ + _)

// => returning an RDD of (KeyType, FieldType)
Run Code Online (Sandbox Code Playgroud)

目前,我在Dataset API中找到的最接近的解决方案如下所示:

ds.map(x => (x.someKey, x.someField))          // [1]
  .groupByKey(_._1)                            
  .reduceGroups((a, b) => (a._1, a._2 + b._2))
  .map(_._2)                                   // [2]

// => returning a Dataset of (KeyType, FieldType)

// Comments:
// [1] As far as I can see, having a map before groupByKey is required
//     to end up with the proper type in reduceGroups. After all, we do
//     not want to reduce over the original type, but the FieldType.
// [2] required since reduceGroups converts back to Dataset[(K, V)]
//     not knowing that our V's are already key-value pairs.
Run Code Online (Sandbox Code Playgroud)

看起来不是很优雅,根据快速基准,它的性能也差得多,所以也许我们在这里遗漏了一些东西......

注意:替代方案可能是groupByKey(_.someKey)作为第一步使用.问题是使用groupByKey将类型从常规更改Dataset为a KeyValueGroupedDataset.后者没有常规map功能.相反,它提供了一个mapGroups看似不太方便的东西,因为它将值包装成一个Iterator并根据文档字符串执行shuffle.

  • 似乎reduceGroups的性能已经从2.0.1和2.1.0修复[Spark-16391](https://issues.apache.org/jira/browse/SPARK-16391) (9认同)
  • 这样就可以了.但请注意,reduceByKey更有效,因为它会在洗牌之前减少每个节点.执行groupByKey首先洗牌所有元素然后开始减少.这就是为什么它的性能要低得多.有趣的是,在我了解reduceByKey之前,这就是我以前做的事情,但我忘记了:-) (7认同)

Jus*_*ond 7

mapPartitions之前使用更有效的解决方案groupByKey来减少混洗量(注意这不是完全相同的签名,reduceByKey但我认为传递函数比要求数据集由元组组成更灵活).

def reduceByKey[V: ClassTag, K](ds: Dataset[V], f: V => K, g: (V, V) => V)
  (implicit encK: Encoder[K], encV: Encoder[V]): Dataset[(K, V)] = {
  def h[V: ClassTag, K](f: V => K, g: (V, V) => V, iter: Iterator[V]): Iterator[V] = {
    iter.toArray.groupBy(f).mapValues(_.reduce(g)).map(_._2).toIterator
  }
  ds.mapPartitions(h(f, g, _))
    .groupByKey(f)(encK)
    .reduceGroups(g)
}
Run Code Online (Sandbox Code Playgroud)

根据数据的形状/大小,这是在性能的1秒内reduceByKey,并且大约2x相当于a groupByKey(_._1).reduceGroups.仍有改进的余地,欢迎提出建议.