Val*_*ale 1 java architecture apache-spark spark-streaming
我有一个应用程序应该从文件读取一些行,并使它们成为一个最终变量,将用作参考.
截至目前,在spark上下文开始之前,我在一个类(称为People)类中启动了一个静态方法
reads the file;
fill a final static HashTable;
static{ hashTable.put(eachline);}
Run Code Online (Sandbox Code Playgroud)
在我的转换代码中,例如:
JavaRDD<String> filteredRDD = anotherRDD.filter( new Function<String,Boolean>(){
public Boolean call(String s){
People.hashTable.containsKey(s);
}
});
Run Code Online (Sandbox Code Playgroud)
释疑:
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaStream").setMaster("local[2]");声明后立即使用广播变量吗?rdd.foreachPartition(....?中实例化classe ?是
不要使用静态变量来传递序列化数据.
通常,当我们将作业扩展到群集中的多个计算机时,带有数据的静态变量将无法与Spark一起使用(看起来它不起作用,b/c Spark正在local(2)模式下运行.
静态字段是对象初始化的一部分,并且不是序列化形式的一部分,因为它们可以/应该在远程操作中在接收侧重建.请注意,如果对象足够智能以在反序列化时重建其内容,它可以工作.
相反,我们可以使用可以序列化的普通实例.(喜欢mydata = new HashMap<>(); mydata.put(...)
假设我们有一个大型数据集,有420个分区和8个执行器节点的集群.在如下的操作中:
val referenceData = Map(...)
val filtered = rdd.filter(elem => referenceData.get(elem) > 10)
Run Code Online (Sandbox Code Playgroud)
该referenceData对象将被序列化420次,或者执行转换所需的任务数量.
而是一个广播变量:
val referenceDataBC = sparkContext.broadcast(Map(...))
val filtered = rdd.filter(elem => referenceDataBC.value.get(elem) > 10)
Run Code Online (Sandbox Code Playgroud)
将被送到每个遗嘱执行人一次,或总共8次.因此,通过减少序列化开销来节省大量网络和CPU.
我们需要在流式处理开始之前加载外部数据.我们还有其他选择吗?
取决于函数是否需要类提供的上下文.例如需要背景:
rdd.foreachPartition{ iter =>
val jsonParser = new JsonParser(validation)
val parsed = iter.map(jsonParser.parse)
...
}
Run Code Online (Sandbox Code Playgroud)
例如,不需要上下文
vectors.foreachPartition{ iter =>
val magnitudes = iter.map(elem => MyVectorMath.modulus(elem))
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3273 次 |
| 最近记录: |