相关疑难解决方法(0)

Apache Spark:map vs mapPartitions?

RDD mapmapPartitions方法有什么区别?并且flatMap表现得像map或喜欢mapPartitions?谢谢.

(编辑)即,两者之间的差异(在语义上或在执行方面)

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }
Run Code Online (Sandbox Code Playgroud)

和:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }
Run Code Online (Sandbox Code Playgroud)

performance scala apache-spark rdd

119
推荐指数
3
解决办法
10万
查看次数

Spark:如何使用mapPartition并为每个分区创建/关闭连接

所以,我想对我的spark DataFrame进行某些操作,将它们写入DB并在最后创建另一个DataFrame.它看起来像这样:

import sqlContext.implicits._

val newDF = myDF.mapPartitions(
  iterator => {
    val conn = new DbConnection
    iterator.map(
       row => {
         addRowToBatch(row)
         convertRowToObject(row)
     })
    conn.writeTheBatchToDB()
    conn.close()
  })
  .toDF()
Run Code Online (Sandbox Code Playgroud)

这给了我一个错误,因为mapPartitions期望返回类型Iterator[NotInferedR],但在这里Unit.我知道这可以用forEachPartition,但我也想做映射.分开进行将是一个开销(额外的火花工作).该怎么办?

谢谢!

scala apache-spark rdd

8
推荐指数
2
解决办法
6550
查看次数

标签 统计

apache-spark ×2

rdd ×2

scala ×2

performance ×1