RDD中的MapValues和爆炸

Cap*_*ree 1 scala apache-spark

我下面有这个示例RDD(rdd以下称为)。数据集是一个元组(String, Int)

(some | random | value, 10)
(some | random | value, 11)
(some | random | value, 12)
Run Code Online (Sandbox Code Playgroud)

我想得到以下输出:

(some, 10)
(random, 10)
(value, 10)
(some, 11)
(random, 11)
(value, 11)
(some, 12)
(random, 12)
(value, 12)
Run Code Online (Sandbox Code Playgroud)

我有这个Scala代码来尝试上述转换:

rdd.map(tuple => tuple._1.split("|").foreach(elemInArray => (elemInArray, tuple._2)))
Run Code Online (Sandbox Code Playgroud)

在这段代码中,我遍历整个数据集,并将元组的第一部分拆分为|。然后,我遍历返回的数组中的每个元素,并使用每个元素和获得的计数split创建一个元组。elementtuple._1

由于某些原因,我一直得到以下结果:

()
()
()
()
()
()
()
()
()
Run Code Online (Sandbox Code Playgroud)

有人知道这个问题吗?我似乎找不到错误的地方。

Nat*_*ord 5

您实际上需要为此使用flatMap

val lt = List(("some | random | value", 10),
              ("some | random | value", 11),
              ("some | random | value", 12))

val convert: ((String, Int)) => List[(String, Int)] = tuple => tuple._1.split('|').map(str =>
  (str, tuple._2)).toList

val t = lt.flatMap(convert)
Run Code Online (Sandbox Code Playgroud)

如我们所见,定义convert函数可能非常有用,因为我们可以通过向函数传递单个元素来确保正确处理每个元素。然后,我们可以将相同的函数传递给flatMap,这会将convert产生的结果列表聚合为一个列表。

以上收益:

t: List[(String, Int)] = List((some ,10), 
                              ( random ,10), 
                              ( value,10), 
                              (some ,11), 
                              ( random ,11), 
                              ( value,11), 
                              (some ,12), 
                              ( random ,12),
                              ( value,12))
Run Code Online (Sandbox Code Playgroud)

显然,我不必理会结果中多余的空格字符,但这可以通过使用以下方法更新convert函数来轻松解决trim

val convert: ((String, Int)) => List[(String, Int)] = tuple => tuple._1.split('|').map(str =>
  (str.trim, tuple._2)).toList
Run Code Online (Sandbox Code Playgroud)