相关疑难解决方法(0)

Spark:每个Spark RDD分区的数据库连接,并执行mapPartition

我想在我的spark rdd上做一个mapPartitions,

    val newRd = myRdd.mapPartitions(
      partition => {

        val connection = new DbConnection /*creates a db connection per partition*/

        val newPartition = partition.map(
           record => {
             readMatchingFromDB(record, connection)
         })
        connection.close()
        newPartition
      })
Run Code Online (Sandbox Code Playgroud)

但是,这给了我一个连接已经关闭的异常,因为预期因为在控制到达之前.map()connection已经关闭了.我想为每个RDD分区创建一个连接,并正确关闭它.我怎样才能做到这一点?

谢谢!

scala apache-spark rdd

2
推荐指数
1
解决办法
4406
查看次数

标签 统计

apache-spark ×1

rdd ×1

scala ×1