如何通过key或filter()使用两个RDD的spark intersection()?

S.K*_*ang 3 scala intersection filter apache-spark rdd

我想用intersection()钥匙或filter()火花来使用.

但我真的不知道如何使用intersection()密钥.

所以我尝试使用filter(),但它没有用.

示例 - 这是两个RDD:

data1 //RDD[(String, Int)] = Array(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1))
data2 //RDD[(String, Int)] = Array(("a", 3), ("b", 5))

val data3 = data2.map{_._1}

data1.filter{_._1 == data3}.collect //Array[(String, Int] = Array()
Run Code Online (Sandbox Code Playgroud)

我希望得到一个(key,value)对,其密钥与data1基于密钥的密钥相同data2.

Array(("a", 1), ("a", 2), ("b", 2), ("b", 3)) 是我想要的结果.

是否有一种方法来解决这个问题使用intersection()键或filter()

mto*_*oto 6

对于你的问题,我认为cogroup()更适合.该intersection()方法将考虑数据中的键和值,并将导致为空rdd.

该功能cogroup()组两者的值rdd的由键,给了我们(key, vals1, vals2),其中vals1vals2包含的价值观data1data2分别为每个键.请注意,如果某个键并不在这两个数据集,一个共享vals1vals2将返回一个空Seq的,因此我们首先要筛选出这些元组在到达路口两个rdd的.

接下来,我们会抓住vals1-它包含的值data1为共同 -并将其转换为格式(key, Array).最后,我们使用flatMapValues()将结果解压缩为(key, value).

val result = (data1.cogroup(data2)
  .filter{case (k, (vals1, vals2)) => vals1.nonEmpty && vals2.nonEmpty }
  .map{case (k, (vals1, vals2)) => (k, vals1.toArray)}
  .flatMapValues(identity[Array[Int]]))

result.collect()
// Array[(String, Int)] = Array((a,1), (a,2), (b,2), (b,3))
Run Code Online (Sandbox Code Playgroud)