ira*_*lls 6 scala apache-spark
我已经看到了星火广播变量的所有例子中使用它们(在职能范围定义它们map(),join()等)。我想同时使用引用广播变量的map()函数和mapPartitions()函数,但是我想将它们模块化,以便可以将相同的函数用于单元测试。
我曾经想过要使用该函数,以便在使用a map或mapPartitionscall 时将对广播变量的引用传递给它。
我想到了这样的东西(伪代码):
// firstFile.scala
// ---------------
def mapper(bcast: Broadcast)(row: SomeRow): Int = {
bcast.value(row._1)
}
def mapMyPartition(bcast: Broadcast)(iter: Iterator): Iterator {
val broadcastVariable = bcast.value
for {
i <- iter
} yield broadcastVariable(i)
})
// secondFile.scala
// ----------------
import firstFile.{mapMyPartition, mapper}
val bcastVariable = sc.broadcast(Map(0 -> 1, 1 -> 2, 2 -> 3))
rdd
.map(mapper(bcastVariable))
.mapPartitions(mapMyPartition(bcastVariable))
Run Code Online (Sandbox Code Playgroud)
您的解决方案应该可以正常工作。在这两种情况下,传递给的函数map{Partitions}在序列化时将包含对广播变量本身的引用,但不包含对其值的引用,并且仅bcast.value在节点上计算时调用。
需要避免的是
def mapper(bcast: Broadcast): SomeRow => Int = {
val value = bcast.value
row => value(row._1)
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2136 次 |
| 最近记录: |