RDD map和mapPartitions方法有什么区别?并且flatMap表现得像map或喜欢mapPartitions?谢谢.
(编辑)即,两者之间的差异(在语义上或在执行方面)
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
preservesPartitioning = true)
}
Run Code Online (Sandbox Code Playgroud)
和:
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.map(fn)
}
Run Code Online (Sandbox Code Playgroud) 所以,我想对我的spark DataFrame进行某些操作,将它们写入DB并在最后创建另一个DataFrame.它看起来像这样:
import sqlContext.implicits._
val newDF = myDF.mapPartitions(
iterator => {
val conn = new DbConnection
iterator.map(
row => {
addRowToBatch(row)
convertRowToObject(row)
})
conn.writeTheBatchToDB()
conn.close()
})
.toDF()
Run Code Online (Sandbox Code Playgroud)
这给了我一个错误,因为mapPartitions期望返回类型Iterator[NotInferedR],但在这里Unit.我知道这可以用forEachPartition,但我也想做映射.分开进行将是一个开销(额外的火花工作).该怎么办?
谢谢!