sri*_*ala 3 scala apache-spark
我有两个rdd一个rdd只有一列其他有两列加入两个RDD上的键我已经添加了虚拟值0,有没有其他有效的方法这样做使用连接?
val lines = sc.textFile("ml-100k/u.data")
val movienamesfile = sc.textFile("Cml-100k/u.item")
val moviesid = lines.map(x => x.split("\t")).map(x => (x(1),0))
val test = moviesid.map(x => x._1)
val movienames = movienamesfile.map(x => x.split("\\|")).map(x => (x(0),x(1)))
val shit = movienames.join(moviesid).distinct()
Run Code Online (Sandbox Code Playgroud)
编辑:
让我在SQL中转换这个问题.比方说我有table1 (moveid)和table2 (movieid,moviename).在SQL中我们写了类似的东西:
select moviename, movieid, count(1)
from table2 inner join table table1 on table1.movieid=table2.moveid
group by ....
Run Code Online (Sandbox Code Playgroud)
这里的SQL table1只有一列,其中table2有两列仍然可以join工作,Spark中的相同方式可以连接来自两个RDD的键.
仅定义连接操作,PairwiseRDDs它与SQL中的关系/表完全不同.每个元素PairwiseRDD都是Tuple2第一个元素是key第二个元素,第二个元素是第二个元素value.两者都可以包含复杂对象,只要key提供有意义的对象hashCode
如果你想在SQL-ish中考虑这个问题,你可以将key视为转到ON子句并value包含所选列的所有内容.
SELECT table1.value, table2.value
FROM table1 JOIN table2 ON table1.key = table2.key
Run Code Online (Sandbox Code Playgroud)
虽然这些方法乍看之下看起来相似,但您可以使用另一种表达方式,但有一个根本区别.当你在SQL表,你忽略了约束所有列属于同一类的对象,而key和value在PairwiseRDD有一个明确的含义.
回到你的问题使用join你需要key和value.可以说比使用0占位符更清洁的是使用null单例,但实际上没有办法解决它.
对于小数据,您可以使用类似于广播连接的过滤器:
val moviesidBD = sc.broadcast(
lines.map(x => x.split("\t")).map(_.head).collect.toSet)
movienames.filter{case (id, _) => moviesidBD.value contains id}
Run Code Online (Sandbox Code Playgroud)
但如果您真的想要SQL-ish加入,那么您应该只使用SparkSQL.
val movieIdsDf = lines
.map(x => x.split("\t"))
.map(a => Tuple1(a.head))
.toDF("id")
val movienamesDf = movienames.toDF("id", "name")
// Add optional join type qualifier
movienamesDf.join(movieIdsDf, movieIdsDf("id") <=> movienamesDf("id"))
Run Code Online (Sandbox Code Playgroud)