比较spark中两个RDD中的数据

Ram*_*hna 4 scala-2.10 apache-spark cloudera-cdh rdd

我可以使用以下代码在两个RDD中打印数据.

usersRDD.foreach(println)
empRDD.foreach(println)
Run Code Online (Sandbox Code Playgroud)

我需要比较两个RDD中的数据.如何迭代和比较一个RDD中的字段数据与另一个RDD中的字段数据.例如:迭代记录并检查名称和年龄userRDD是否具有匹配记录empRDD,如果没有放入单独的RDD中.

我尝试过,userRDD.substract(empRDD)但它正在比较所有的领域.

Sea*_*wen 6

您需要键入每个RDD中的数据,以便有一些东西可以连接记录.看看groupBy例如.然后你join得到RDD.对于每个键,您将获得两者中的匹配值.如果您有兴趣找到不匹配的密钥,请使用leftOuterJoin,如下所示:

// Returns the entries in userRDD that have no corresponding key in empRDD.
def nonEmp(userRDD: RDD[(String, String)], empRDD: RDD[(String, String)]) = {
  userRDD.leftOuterJoin(empRDD).collect {
    case (name, (age, None)) => name -> age
  }
}
Run Code Online (Sandbox Code Playgroud)