在spark中加入两个RDD

sri*_*ala 3 scala apache-spark

我有两个rdd一个rdd只有一列其他有两列加入两个RDD上的键我已经添加了虚拟值0,有没有其他有效的方法这样做使用连接?

val lines = sc.textFile("ml-100k/u.data")
val movienamesfile = sc.textFile("Cml-100k/u.item")

val moviesid = lines.map(x => x.split("\t")).map(x => (x(1),0))
val test = moviesid.map(x => x._1)
val movienames = movienamesfile.map(x => x.split("\\|")).map(x => (x(0),x(1)))
val shit = movienames.join(moviesid).distinct()
Run Code Online (Sandbox Code Playgroud)

编辑:

让我在SQL中转换这个问题.比方说我有table1 (moveid)table2 (movieid,moviename).在SQL中我们写了类似的东西:

select moviename, movieid, count(1)
from table2 inner join table table1 on table1.movieid=table2.moveid 
group by ....
Run Code Online (Sandbox Code Playgroud)

这里的SQL table1只有一列,其中table2有两列仍然可以join工作,Spark中的相同方式可以连接来自两个RDD的键.

zer*_*323 8

仅定义连接操作,PairwiseRDDs它与SQL中的关系/表完全不同.每个元素PairwiseRDD都是Tuple2第一个元素是key第二个元素,第二个元素是第二个元素value.两者都可以包含复杂对象,只要key提供有意义的对象hashCode

如果你想在SQL-ish中考虑这个问题,你可以将key视为转到ON子句并value包含所选列的所有内容.

SELECT table1.value, table2.value
FROM table1 JOIN table2 ON table1.key = table2.key
Run Code Online (Sandbox Code Playgroud)

虽然这些方法乍看之下看起来相似,但您可以使用另一种表达方式,但有一个根本区别.当你在SQL表,你忽略了约束所有列属于同一类的对象,而keyvaluePairwiseRDD有一个明确的含义.

回到你的问题使用join你需要keyvalue.可以说比使用0占位符更清洁的是使用null单例,但实际上没有办法解决它.

对于小数据,您可以使用类似于广播连接的过滤器:

val moviesidBD = sc.broadcast(
  lines.map(x => x.split("\t")).map(_.head).collect.toSet)

movienames.filter{case (id, _) => moviesidBD.value contains id}
Run Code Online (Sandbox Code Playgroud)

但如果您真的想要SQL-ish加入,那么您应该只使用SparkSQL.

val movieIdsDf = lines
   .map(x => x.split("\t"))
   .map(a => Tuple1(a.head))
   .toDF("id")

val movienamesDf = movienames.toDF("id", "name")

// Add optional join type qualifier 
movienamesDf.join(movieIdsDf, movieIdsDf("id") <=> movienamesDf("id"))
Run Code Online (Sandbox Code Playgroud)