And*_*bbs 29 java scala broadcast apache-spark spark-streaming
我相信,我有一个相对常见的火花流用例:
我有一个对象流,我想根据一些参考数据进行过滤
最初,我认为使用广播变量实现这是一件非常简单的事情:
public void startSparkEngine {
Broadcast<ReferenceData> refdataBroadcast
= sparkContext.broadcast(getRefData());
final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> {
final ReferenceData refData = refdataBroadcast.getValue();
return obj.getField().equals(refData.getField());
}
filteredStream.foreachRDD(rdd -> {
rdd.foreach(obj -> {
// Final processing of filtered objects
});
return null;
});
}
Run Code Online (Sandbox Code Playgroud)
但是,尽管很少,我的参考数据会定期更改
我的印象是我可以在驱动程序上修改和重新广播我的变量,它会传播给每个工作者,但是Broadcast对象不是也不Serializable需要final.
我有什么替代品?我能想到的三个解决方案是:
将参考数据查找移动到一个forEachPartition或forEachRdd左右,使其完全驻留在工作者上.但是,参考数据存在于REST API中,因此我还需要以某种方式存储计时器/计数器以停止对流中的每个元素访问远程数据库.
每次refdata更改时,使用新的广播变量重新启动Spark上下文.
将参考数据转换为RDD,然后join以我现在流式传输的方式将流转换为流Pair<MyObject, RefData>,尽管这会将参考数据与每个对象一起发送.
Aas*_*tha 23
通过@Rohan Aletty扩展答案.这是一个BroadcastWrapper的示例代码,它根据某些ttl刷新广播变量
public class BroadcastWrapper {
private Broadcast<ReferenceData> broadcastVar;
private Date lastUpdatedAt = Calendar.getInstance().getTime();
private static BroadcastWrapper obj = new BroadcastWrapper();
private BroadcastWrapper(){}
public static BroadcastWrapper getInstance() {
return obj;
}
public JavaSparkContext getSparkContext(SparkContext sc) {
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
return jsc;
}
public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
Date currentDate = Calendar.getInstance().getTime();
long diff = currentDate.getTime()-lastUpdatedAt.getTime();
if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
if (var != null)
var.unpersist();
lastUpdatedAt = new Date(System.currentTimeMillis());
//Your logic to refresh
ReferenceData data = getRefData();
var = getSparkContext(sparkContext).broadcast(data);
}
return var;
}
}
Run Code Online (Sandbox Code Playgroud)
您的代码如下所示:
public void startSparkEngine() {
final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> {
Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());
stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField()));
});
filteredStream.foreachRDD(rdd -> {
rdd.foreach(obj -> {
// Final processing of filtered objects
});
return null;
});
}
Run Code Online (Sandbox Code Playgroud)
这对我来说也适用于多集群.希望这可以帮助
最近遇到了这个问题。认为它可能对 Scala 用户有帮助..
Scala 的做法BroadCastWrapper就像下面的例子。
import java.io.{ ObjectInputStream, ObjectOutputStream }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.StreamingContext
import scala.reflect.ClassTag
/* wrapper lets us update brodcast variables within DStreams' foreachRDD
without running into serialization issues */
case class BroadcastWrapper[T: ClassTag](
@transient private val ssc: StreamingContext,
@transient private val _v: T) {
@transient private var v = ssc.sparkContext.broadcast(_v)
def update(newValue: T, blocking: Boolean = false): Unit = {
v.unpersist(blocking)
v = ssc.sparkContext.broadcast(newValue)
}
def value: T = v.value
private def writeObject(out: ObjectOutputStream): Unit = {
out.writeObject(v)
}
private def readObject(in: ObjectInputStream): Unit = {
v = in.readObject().asInstanceOf[Broadcast[T]]
}
}
Run Code Online (Sandbox Code Playgroud)
每次您需要调用更新函数来获取新的广播变量。
几乎每个处理流应用程序的人都需要一种方法来编织(过滤,查找等)参考数据(从数据库,文件等)到流数据.我们对这两部分进行了部分解决
查找要在流操作中使用的参考数据
在大多数情况下,这项工作正常,但以下情况除外
更新参考数据
尽管有这些线程中的建议,但没有明确的方法来实现这一点,即:杀死先前的广播变量并创建新的变量.多个未知数,例如这些操作之间的预期.
这是一个常见的需求,如果有办法将信息发送到广播变量通知更新,它会有所帮助.这样,就可以使"CacheLookup"中的本地缓存无效
问题的第二部分仍未解决.如果有任何可行的方法,我会感兴趣
不确定您是否已经尝试过此操作,但我认为可以在不关闭SparkContext. 通过使用该unpersist()方法,广播变量的副本在每个执行器上被删除,并且需要重新广播该变量才能再次访问。对于您的用例,当您想要更新广播时,您可以:
等待您的执行者完成当前的数据系列
取消保留广播变量
更新广播变量
重新广播以将新的参考数据发送给执行者
我从这篇文章中汲取了大量的经验,但最后一个回复的人声称已经在本地工作了。需要注意的是,您可能希望将阻塞设置true为非持久性,以便确保执行器删除旧数据(这样在下一次迭代时不会再次读取过时的值)。
| 归档时间: |
|
| 查看次数: |
15714 次 |
| 最近记录: |