如何从RDD创建RDD集合?

mat*_*nja 6 scala apache-spark

我有一个RDD[String],wordRDD.我还有一个从字符串/单词创建RDD [String]的函数.我想创建一个新的RDD 每串wordRDD.以下是我的尝试:

1)失败,因为Spark不支持嵌套的RDD:

var newRDD = wordRDD.map( word => {
  // execute myFunction()
  (new MyClass(word)).myFunction()
})
Run Code Online (Sandbox Code Playgroud)

2)失败(可能是由于范围问题?):

var newRDD = sc.parallelize(new Array[String](0))
val wordArray = wordRDD.collect
for (w <- wordArray){
  newRDD = sc.union(newRDD,(new MyClass(w)).myFunction())
}
Run Code Online (Sandbox Code Playgroud)

我理想的结果如下:

// input RDD (wordRDD)
wordRDD: org.apache.spark.rdd.RDD[String] = ('apple','banana','orange'...)

// myFunction behavior
new MyClass('apple').myFunction(): RDD[String] = ('pple','aple'...'appl')

// after executing myFunction() on each word in wordRDD:
newRDD: RDD[String] = ('pple','aple',...,'anana','bnana','baana',...)
Run Code Online (Sandbox Code Playgroud)

我在这里找到了一个相关的问题:当很多RDD抛出堆栈溢出错误时,Spark会发生火花,但它没有解决我的问题.

Cli*_*der 3

用来flatMap获得RDD[String]你想要的。

var allWords = wordRDD.flatMap { word => 
  (new MyClass(word)).myFunction().collect()
}
Run Code Online (Sandbox Code Playgroud)

  • 这应该如何并行运行?`wordRDD.map` 中发生的所有事情都在集群上执行。因此,内部“collect”必须从正在运行的作业中触发新的 Spark 作业。我怀疑它不会分布式运行。 (2认同)