Spark:RDD到List

bil*_*ill 15 scala list apache-spark rdd

我有一个RDD结构

RDD[(String, String)]
Run Code Online (Sandbox Code Playgroud)

我想创建2个列表(rdd的每个维度一个).

我尝试使用rdd.foreach()并填充两个ListBuffers然后将它们转换为Lists,但我猜每个节点都创建自己的ListBuffer,因为在迭代之后BufferLists是空的.我该怎么做 ?

编辑:我的方法

val labeled = data_labeled.map { line =>
  val parts = line.split(',')
  (parts(5), parts(7))
}.cache()

var testList : ListBuffer[String] = new ListBuffer()

labeled.foreach(line =>
  testList += line._1
)
  val labeledList = testList.toList
  println("rdd: " + labeled.count)
  println("bufferList: " + testList.size)
  println("list: " + labeledList.size)
Run Code Online (Sandbox Code Playgroud)

结果是:

rdd: 31990654
bufferList: 0
list: 0
Run Code Online (Sandbox Code Playgroud)

Tza*_*har 19

如果你真的想创建两个列表 - 意思是,你希望将所有分布式数据收集到驱动程序应用程序中(冒着缓慢的风险OutOfMemoryError) - 你可以使用collect然后map对结果使用简单的操作:

val list: List[(String, String)] = rdd.collect().toList
val col1: List[String] = list.map(_._1)
val col2: List[String] = list.map(_._2)
Run Code Online (Sandbox Code Playgroud)

或者-如果你想"拆分"您的RDD分为两个RDDS -这是没有收集数据非常相似:

rdd.cache() // to make sure calculation of rdd is not repeated twice
val rdd1: RDD[String] = rdd.map(_._1)
val rdd2: RDD[String] = rdd.map(_._2)
Run Code Online (Sandbox Code Playgroud)

第三种方法是首先映射到这两个RDD,然后收集它们中的每一个,但它与第一​​个选项没有太大差别,并且存在相同的风险和限制.