Spark:如何使用mapPartition并为每个分区创建/关闭连接

voi*_*oid 8 scala apache-spark rdd

所以,我想对我的spark DataFrame进行某些操作,将它们写入DB并在最后创建另一个DataFrame.它看起来像这样:

import sqlContext.implicits._

val newDF = myDF.mapPartitions(
  iterator => {
    val conn = new DbConnection
    iterator.map(
       row => {
         addRowToBatch(row)
         convertRowToObject(row)
     })
    conn.writeTheBatchToDB()
    conn.close()
  })
  .toDF()
Run Code Online (Sandbox Code Playgroud)

这给了我一个错误,因为mapPartitions期望返回类型Iterator[NotInferedR],但在这里Unit.我知道这可以用forEachPartition,但我也想做映射.分开进行将是一个开销(额外的火花工作).该怎么办?

谢谢!

小智 12

在大多数情况下,如果不减慢作业速度,那么急于使用迭代器将导致执行失败.因此,我所做的是检查迭代器是否已经为空,然后执行清理例程.

rdd.mapPartitions(itr => {
    val conn = new DbConnection
    itr.map(data => {
       val yourActualResult = // do something with your data and conn here
       if(itr.isEmpty) conn.close // close the connection
       yourActualResult
    })
})
Run Code Online (Sandbox Code Playgroud)

起初认为这是一个火花问题但实际上是一个scala.http://www.scala-lang.org/api/2.12.0/scala/collection/Iterator.html#isEmpty:Boolean


Tza*_*har 9

匿名函数实现中的最后一个表达式必须是返回值:

import sqlContext.implicits._

val newDF = myDF.mapPartitions(
  iterator => {
    val conn = new DbConnection
    // using toList to force eager computation - make it happen now when connection is open
    val result = iterator.map(/* the same... */).toList
    conn.writeTheBatchToDB()
    conn.close()
    result.iterator
  }
).toDF()
Run Code Online (Sandbox Code Playgroud)

  • 如果我必须在`iterator.map()`函数中使用`conn`怎么办?我不会得到已经关闭的连接异常吗? (2认同)
  • 如果没有toList,我会以相同的懒惰方式关闭它:`(iterator.map(...)++ Seq(null)).filter(_!= null || {close; false})` (2认同)