我想在我的spark rdd上做一个mapPartitions,
val newRd = myRdd.mapPartitions(
partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(
record => {
readMatchingFromDB(record, connection)
})
connection.close()
newPartition
})
Run Code Online (Sandbox Code Playgroud)
但是,这给了我一个连接已经关闭的异常,因为预期因为在控制到达之前.map()我connection已经关闭了.我想为每个RDD分区创建一个连接,并正确关闭它.我怎样才能做到这一点?
谢谢!