如何更新火花流中的广播变量?

And*_*bbs 29 java scala broadcast apache-spark spark-streaming

我相信,我有一个相对常见的火花流用例:

我有一个对象流,我想根据一些参考数据进行过滤

最初,我认为使用广播变量实现这是一件非常简单的事情:

public void startSparkEngine {
    Broadcast<ReferenceData> refdataBroadcast
      = sparkContext.broadcast(getRefData());

    final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> {
        final ReferenceData refData = refdataBroadcast.getValue();
        return obj.getField().equals(refData.getField());
    }

    filteredStream.foreachRDD(rdd -> {
        rdd.foreach(obj -> {
            // Final processing of filtered objects
        });
        return null;
    });
}
Run Code Online (Sandbox Code Playgroud)

但是,尽管很少,我的参考数据会定期更改

我的印象是我可以在驱动程序上修改和重新广播我的变量,它会传播给每个工作者,但是Broadcast对象不是也不Serializable需要final.

我有什么替代品?我能想到的三个解决方案是:

  1. 将参考数据查找移动到一个forEachPartitionforEachRdd左右,使其完全驻留在工作者上.但是,参考数据存在于REST API中,因此我还需要以某种方式存储计时器/计数器以停止对流中的每个元素访问远程数据库.

  2. 每次refdata更改时,使用新的广播变量重新启动Spark上下文.

  3. 将参考数据转换为RDD,然后join以我现在流式传输的方式将流转换为流Pair<MyObject, RefData>,尽管这会将参考数据与每个对象一起发送.

Aas*_*tha 23

通过@Rohan Aletty扩展答案.这是一个BroadcastWrapper的示例代码,它根据某些ttl刷新广播变量

public class BroadcastWrapper {

    private Broadcast<ReferenceData> broadcastVar;
    private Date lastUpdatedAt = Calendar.getInstance().getTime();

    private static BroadcastWrapper obj = new BroadcastWrapper();

    private BroadcastWrapper(){}

    public static BroadcastWrapper getInstance() {
        return obj;
    }

    public JavaSparkContext getSparkContext(SparkContext sc) {
       JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
       return jsc;
    }

    public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
        Date currentDate = Calendar.getInstance().getTime();
        long diff = currentDate.getTime()-lastUpdatedAt.getTime();
        if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
            if (var != null)
               var.unpersist();
            lastUpdatedAt = new Date(System.currentTimeMillis());

            //Your logic to refresh
            ReferenceData data = getRefData();

            var = getSparkContext(sparkContext).broadcast(data);
       }
       return var;
   }
}
Run Code Online (Sandbox Code Playgroud)

您的代码如下所示:

public void startSparkEngine() {

    final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> {
        Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());

        stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField()));
    });

    filteredStream.foreachRDD(rdd -> {
        rdd.foreach(obj -> {
        // Final processing of filtered objects
        });
        return null;
    });
}
Run Code Online (Sandbox Code Playgroud)

这对我来说也适用于多集群.希望这可以帮助

  • 任何想法在新的结构化流API中如何工作? (5认同)
  • 此函数返回Broadcast类型对象的引用.广播类型对象将具有广播变量的标识符和块的数量.当调用refdataBroadcast.getValue()时,如果执行程序内存中存在广播标识符,则不会重新计算它.所有这些都发生在执行程序上,但是当调用sparkContext.broadcast时,驱动程序就会出现.因此,只有在刷新并重新广播变量时​​才会在驱动程序节点上执行updateAndGet(只有驱动程序可以注意). (3认同)
  • 由于我遇到类似的问题,我想知道是否有人有机会对上面的代码进行Python实现?我认为这可能是克服我目前遇到的一些困难的好方法.感谢所有帮助. (2认同)

Ram*_*ram 8

最近遇到了这个问题。认为它可能对 Scala 用户有帮助..

Scala 的做法BroadCastWrapper就像下面的例子。

import java.io.{ ObjectInputStream, ObjectOutputStream }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.StreamingContext
import scala.reflect.ClassTag

/* wrapper lets us update brodcast variables within DStreams' foreachRDD
 without running into serialization issues */
case class BroadcastWrapper[T: ClassTag](
 @transient private val ssc: StreamingContext,
  @transient private val _v: T) {

  @transient private var v = ssc.sparkContext.broadcast(_v)

  def update(newValue: T, blocking: Boolean = false): Unit = {

    v.unpersist(blocking)
    v = ssc.sparkContext.broadcast(newValue)
  }

  def value: T = v.value

  private def writeObject(out: ObjectOutputStream): Unit = {
    out.writeObject(v)
  }

  private def readObject(in: ObjectInputStream): Unit = {
    v = in.readObject().asInstanceOf[Broadcast[T]]
  }
}
Run Code Online (Sandbox Code Playgroud)

每次您需要调用更新函数来获取新的广播变量。

  • 对于那些想知道的人来说,值公开了底层广播对象以供只读;对于一些特殊的序列化情况需要 writeObject 和 readObject。请参阅:[Java 序列化](https://docs.oracle.com/javase/7/docs/api/java/io/Serializing.html) (2认同)

Rav*_*ddy 6

几乎每个处理流应用程序的人都需要一种方法来编织(过滤,查找等)参考数据(从数据库,文件等)到流数据.我们对这两部分进行了部分解决

  1. 查找要在流操作中使用的参考数据

    • 使用所需的缓存TTL创建CacheLookup对象
    • 将它包装在Broadcast中
    • 使用CacheLookup作为流逻辑的一部分

在大多数情况下,这项工作正常,但以下情况除外

  1. 更新参考数据

    尽管有这些线程中的建议,但没有明确的方法来实现这一点,即:杀死先前的广播变量并创建新的变量.多个未知数,例如这些操作之间的预期.

这是一个常见的需求,如果有办法将信息发送到广播变量通知更新,它会有所帮助.这样,就可以使"CacheLookup"中的本地缓存无效

问题的第二部分仍未解决.如果有任何可行的方法,我会感兴趣


Roh*_*tty 4

不确定您是否已经尝试过此操作,但我认为可以在不关闭SparkContext. 通过使用该unpersist()方法,广播变量的副本在每个执行器上被删除,并且需要重新广播该变量才能再次访问。对于您的用例,当您想要更新广播时,您可以:

  1. 等待您的执行者完成当前的数据系列

  2. 取消保留广播变量

  3. 更新广播变量

  4. 重新广播以将新的参考数据发送给执行者

我从这篇文章中汲取了大量的经验,但最后一个回复的人声称已经在本地工作了。需要注意的是,您可能希望将阻塞设置true为非持久性,以便确保执行器删除旧数据(这样在下一次迭代时不会再次读取过时的值)。