voi*_*oid 2 scala apache-spark rdd
我想在我的spark rdd上做一个mapPartitions,
val newRd = myRdd.mapPartitions(
partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(
record => {
readMatchingFromDB(record, connection)
})
connection.close()
newPartition
})
Run Code Online (Sandbox Code Playgroud)
但是,这给了我一个连接已经关闭的异常,因为预期因为在控制到达之前.map()我connection已经关闭了.我想为每个RDD分区创建一个连接,并正确关闭它.我怎样才能做到这一点?
谢谢!
正如在讨论中提到这里 -这个问题从地图操作上的迭代器的懒惰茎partition.这种懒惰意味着对于每个分区,创建并关闭连接,并且仅在稍后(当执行RDD时)readMatchingFromDB被调用.
要解决此问题,您应该在关闭连接之前强制执行迭代器的热切遍历,例如将其转换为列表(然后返回):
val newRd = myRdd.mapPartitions(partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(record => {
readMatchingFromDB(record, connection)
}).toList // consumes the iterator, thus calls readMatchingFromDB
connection.close()
newPartition.iterator // create a new iterator
})
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4406 次 |
| 最近记录: |