Rag*_*rni 5 scala apache-spark
我有两个 rdd,我想为 rdd1 的每个项目对 RDD2 项目进行一些计算。所以,我在用户定义的函数中传递 RDD2,如下所示,但我收到类似rdd1 cannot be passed in another rdd. 如果我想在两个 rdd 上执行操作,我能知道如何实现吗?
例如:
RDD1.map(line =>function(line,RDD2))
正如错误所示,Spark 不支持嵌套 RDD。通常你必须通过重新设计算法来解决这个问题。
如何做到这一点取决于实际的用例、到底发生了什么function以及它的输出是什么。
有时RDD1.cartesian(RDD2),对每个元组进行操作然后按键减少会起作用。有时,如果您(K,V)输入了两个 RDD 之间的连接就可以了。
如果 RDD2 很小,您始终可以在驱动程序中收集它,将其设为广播变量并在中使用该变量function而不是RDD2.
@编辑:
例如,我们假设您的 RDD 保存字符串,并计算给定记录在以下位置出现的function次数:RDDRDD2
def function(line: String, rdd: RDD[String]): (String, Int) = {
(line, rdd.filter(_ == line).count)
}
Run Code Online (Sandbox Code Playgroud)
这将返回一个RDD[(String, Int)].
想法1
您可以尝试使用RDD 的方法来使用笛卡尔积cartesian。
val cartesianProduct = RDD1.cartesian(RDD2) // creates RDD[(String, String)]
.map( (r1,r2) => (r1, function2) ) // creates RDD[(String, Int)]
.reduceByKey( (c1,c2) => c1 + c2 ) // final RDD[(String, Int)]
Run Code Online (Sandbox Code Playgroud)
这里function2接受r1and (它们是字符串),如果它们相等则r2返回,否则返回。最终的映射将产生一个元组,其中键是记录,值是总计数。10RDDr1
问题1:不过,如果 中有重复的字符串,这将不起作用RDD1。你必须考虑一下。如果RDD1记录有一些唯一的 ID,那就完美了。
问题2:这确实会创建很多对(对于两个 RDD 中的 100 万条记录,它将创建大约 5000 亿对),速度会很慢,并且很可能会导致大量的洗牌。
想法2
我不明白你对 RDD2 大小的评论lacs,所以这可能有效也可能无效:
val rdd2array = sc.broadcast(RDD2.collect())
val result = RDD1.map(line => function(line, rdd2array))
Run Code Online (Sandbox Code Playgroud)
问题:这可能会破坏你的记忆。collect()被调用driver,并且all来自的记录rdd2将被加载到驱动程序节点上的内存中。
想法3
根据用例,还有其他方法可以克服这个问题,例如相似性搜索的强力算法与您的用例相似(不是双关语)。替代解决方案之一是局部敏感哈希。