在不同的类中访问Spark广播变量

Alo*_*lok 4 scala apache-spark spark-streaming apache-spark-sql

我在Spark Streaming应用程序中广播一个值.但我不知道如何在与播放它的类不同的类中访问该变量.

我的代码如下:

object AppMain{
  def main(args: Array[String]){
    //...
    val broadcastA = sc.broadcast(a)
    //..
    lines.foreachRDD(rdd => {
    val obj = AppObject1
    rdd.filter(p => obj.apply(p))
    rdd.count
  }
}

object AppObject1: Boolean{
  def apply(str: String){
    AnotherObject.process(str)
  }
}
object AnotherObject{
  // I want to use broadcast variable in this object
  val B = broadcastA.Value // compilation error here
  def process(): Boolean{
   //need to use B inside this method
  }
}
Run Code Online (Sandbox Code Playgroud)

在这种情况下,任何人都可以建议如何访问广播变量?

zer*_*323 6

这里没有特别的Spark特定,忽略了可能的序列化问题.如果你想使用某个对象,它必须在当前范围内可用,你可以像往常一样实现:

  • 您可以在已定义广播的范围内定义助手:

    {
        ...
        val x = sc.broadcast(1)
        object Foo {
          def foo = x.value
        }
        ...
    }
    
    Run Code Online (Sandbox Code Playgroud)
  • 您可以将它用作构造函数参数:

    case class Foo(x: org.apache.spark.broadcast.Broadcast[Int]) {
      def foo = x.value
    }
    
    ...
    
    Foo(sc.broadcast(1)).foo
    
    Run Code Online (Sandbox Code Playgroud)
  • 方法论证

    case class Foo() {
      def foo(x: org.apache.spark.broadcast.Broadcast[Int]) = x.value
    }
    
    ...
    
    Foo().foo(sc.broadcast(1))
    
    Run Code Online (Sandbox Code Playgroud)
  • 或者甚至混合你的助手:

    trait Foo {
      val x: org.apache.spark.broadcast.Broadcast[Int]
      def foo = x.value
    }
    
    object Main extends Foo {
      val sc = new SparkContext("local",  "test", new SparkConf())
      val x = sc.broadcast(1)
    
      def main(args: Array[String]) {
        sc.parallelize(Seq(None)).map(_ => foo).first
        sc.stop
      }
    }
    
    Run Code Online (Sandbox Code Playgroud)

  • 作为函数参数传递广播变量有什么性能影响(例如在curried map函数中:`map(bcast)(row)`)?典型的Spark示例始终使用它在实例的同一范围内实例化广播变量.如果您想将地图功能移出范围但仍引用广播变量,该怎么办? (2认同)