Rya*_*anH 48 scala apache-spark
我有两个我想加入的RDD,它们看起来像这样:
val rdd1:RDD[(T,U)]
val rdd2:RDD[((T,W), V)]
Run Code Online (Sandbox Code Playgroud)
碰巧的是,键值rdd1是唯一的,并且元组键值rdd2是唯一的.我想加入这两个数据集,以便获得以下rdd:
val rdd_joined:RDD[((T,W), (U,V))]
Run Code Online (Sandbox Code Playgroud)
实现这一目标的最有效方法是什么?以下是我想到的一些想法.
选项1:
val m = rdd1.collectAsMap
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))})
Run Code Online (Sandbox Code Playgroud)
选项2:
val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2)
Run Code Online (Sandbox Code Playgroud)
选项1将收集掌握的所有数据,对吧?因此,如果rdd1很大(在我的情况下它相对较大,虽然比rdd2小一个数量级),这似乎不是一个好的选择.选项2做了一个丑陋的独特和笛卡尔产品,这似乎也非常低效.我想到的另一种可能性(但尚未尝试)是做选项1并广播地图,尽管以"智能"方式进行广播会更好,这样地图的按键与钥匙rdd2.
有没有人遇到过这种情况?我很乐意有你的想法.
谢谢!
Jos*_*sen 59
一种选择是通过收集rdd1驱动程序并将其广播给所有映射器来执行广播连接; 正确完成,这将让我们避免大型rdd2RDD 的昂贵洗牌:
val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C")))
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333)))
val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap())
val joined = rdd2.mapPartitions({ iter =>
val m = rdd1Broadcast.value
for {
((t, w), u) <- iter
if m.contains(t)
} yield ((t, w), (u, m.get(t).get))
}, preservesPartitioning = true)
Run Code Online (Sandbox Code Playgroud)
该preservesPartitioning = true告诉火花,这个地图功能不修改的键rdd2; 这将允许Spark避免rdd2为基于(t, w)密钥连接的任何后续操作重新分区.
这种广播可能效率低下,因为它涉及到驾驶员的通信瓶颈.原则上,可以在不涉及驱动程序的情况下将一个RDD广播到另一个RDD; 我有一个原型,我想概括并添加到Spark.
另一种选择是重新映射键rdd2并使用Spark join方法; 这将涉及完全洗牌rdd2(并且可能rdd1):
rdd1.join(rdd2.map {
case ((t, w), u) => (t, (w, u))
}).map {
case (t, (v, (w, u))) => ((t, w), (u, v))
}.collect()
Run Code Online (Sandbox Code Playgroud)
在我的示例输入中,这两种方法都会产生相同的结果:
res1: Array[((Int, java.lang.String), (Int, java.lang.String))] = Array(((1,Z),(111,A)), ((1,ZZ),(111,A)), ((2,Y),(222,B)), ((3,X),(333,C)))
Run Code Online (Sandbox Code Playgroud)
第三种选择是重组rdd2,这t是它的关键,然后执行上面的连接.
小智 14
另一种方法是创建自定义分区程序,然后使用zipPartitions加入RDD.
import org.apache.spark.HashPartitioner
class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) {
override def getPartition(key: Any): Int = key match {
case k: Tuple2[Int, String] => super.getPartition(k._1)
case _ => super.getPartition(key)
}
}
val numSplits = 8
val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))).partitionBy(new HashPartitioner(numSplits))
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((1, "AA"), 123), ((2, "Y"), 222), ((3, "X"), 333))).partitionBy(new RDD2Partitioner(numSplits))
val result = rdd2.zipPartitions(rdd1)(
(iter2, iter1) => {
val m = iter1.toMap
for {
((t: Int, w), u) <- iter2
if m.contains(t)
} yield ((t, w), (u, m.get(t).get))
}
).partitionBy(new HashPartitioner(numSplits))
result.glom.collect
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
48149 次 |
| 最近记录: |