如何引用范围之外的Spark广播变量

ira*_*lls 6 scala apache-spark

我已经看到了星火广播变量的所有例子中使用它们(在职能范围定义它们map()join()等)。我想同时使用引用广播变量的map()函数和mapPartitions()函数,但是我想将它们模块化,以便可以将相同的函数用于单元测试。

  • 我该怎么做?

我曾经想过要使用该函数,以便在使用a mapmapPartitionscall 时将对广播变量的引用传递给它。

  • 在原始范围内定义函数时,通过传递对广播变量的引用通常会找不到这些性能影响吗?

我想到了这样的东西(伪代码):

// firstFile.scala
// ---------------

def mapper(bcast: Broadcast)(row: SomeRow): Int = {
  bcast.value(row._1)
}

def mapMyPartition(bcast: Broadcast)(iter: Iterator): Iterator {
  val broadcastVariable = bcast.value

  for {
    i <- iter
  } yield broadcastVariable(i)
})


// secondFile.scala
// ----------------

import firstFile.{mapMyPartition, mapper}

val bcastVariable = sc.broadcast(Map(0 -> 1, 1 -> 2, 2 -> 3))

rdd
 .map(mapper(bcastVariable))
 .mapPartitions(mapMyPartition(bcastVariable))
Run Code Online (Sandbox Code Playgroud)

Ale*_*nov 2

您的解决方案应该可以正常工作。在这两种情况下,传递给的函数map{Partitions}在序列化时将包含对广播变量本身的引用,但不包含对其值的引用,并且仅bcast.value在节点上计算时调用。

需要避免的是

def mapper(bcast: Broadcast): SomeRow => Int = {
  val value = bcast.value
  row => value(row._1)
}
Run Code Online (Sandbox Code Playgroud)