Spark:每个Spark RDD分区的数据库连接,并执行mapPartition

voi*_*oid 2 scala apache-spark rdd

我想在我的spark rdd上做一个mapPartitions,

    val newRd = myRdd.mapPartitions(
      partition => {

        val connection = new DbConnection /*creates a db connection per partition*/

        val newPartition = partition.map(
           record => {
             readMatchingFromDB(record, connection)
         })
        connection.close()
        newPartition
      })
Run Code Online (Sandbox Code Playgroud)

但是,这给了我一个连接已经关闭的异常,因为预期因为在控制到达之前.map()connection已经关闭了.我想为每个RDD分区创建一个连接,并正确关闭它.我怎样才能做到这一点?

谢谢!

Tza*_*har 8

正如在讨论中提到这里 -这个问题从地图操作上的迭代器的懒惰茎partition.这种懒惰意味着对于每个分区,创建并关闭连接,并且仅在稍后(当执行RDD时)readMatchingFromDB被调用.

要解决此问题,您应该在关闭连接之前强制执行迭代器的热切遍历,例如将其转换为列表(然后返回):

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close()
  newPartition.iterator // create a new iterator
})
Run Code Online (Sandbox Code Playgroud)