我已经看到了星火广播变量的所有例子中使用它们(在职能范围定义它们map(),join()等)。我想同时使用引用广播变量的map()函数和mapPartitions()函数,但是我想将它们模块化,以便可以将相同的函数用于单元测试。
我曾经想过要使用该函数,以便在使用a map或mapPartitionscall 时将对广播变量的引用传递给它。
我想到了这样的东西(伪代码):
// firstFile.scala
// ---------------
def mapper(bcast: Broadcast)(row: SomeRow): Int = {
bcast.value(row._1)
}
def mapMyPartition(bcast: Broadcast)(iter: Iterator): Iterator {
val broadcastVariable = bcast.value
for {
i <- iter
} yield broadcastVariable(i)
})
// secondFile.scala
// ----------------
import firstFile.{mapMyPartition, mapper}
val bcastVariable = sc.broadcast(Map(0 -> 1, 1 -> 2, 2 -> 3))
rdd
.map(mapper(bcastVariable))
.mapPartitions(mapMyPartition(bcastVariable))
Run Code Online (Sandbox Code Playgroud)