Spark程序结构:类中的广播变量vs最终静态与外部静态属性

Val*_*ale 1 java architecture apache-spark spark-streaming

我有一个应用程序应该从文件读取一些行,并使它们成为一个最终变量,将用作参考.
截至目前,在spark上下文开始之前,我在一个类(称为People)类中启动了一个静态方法

reads the file;
fill a final static HashTable;
static{ hashTable.put(eachline);}
Run Code Online (Sandbox Code Playgroud)

在我的转换代码中,例如:

JavaRDD<String> filteredRDD = anotherRDD.filter( new Function<String,Boolean>(){
    public Boolean call(String s){
        People.hashTable.containsKey(s);
    }
});
Run Code Online (Sandbox Code Playgroud)

释疑:

  1. 我应该在SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaStream").setMaster("local[2]");声明后立即使用广播变量吗?
  2. 为什么我要在最后一个选择广播变量?据我所知,决赛在转型流程中顺利通过.
  3. 在SparkStreaming计算开始之前加载文件内容的某个位置是正确/优雅的过程吗?
  4. 如果我有外部类来处理一些计算(主要是为了可读性),我是否更愿意以静态方式访问这些方法或者在一个rdd.foreachPartition(....?中实例化classe ?

maa*_*asg 8

1)我们应该使用广播变量吗?

2)广播变量与静态初始化变量

不要使用静态变量来传递序列化数据.

通常,当我们将作业扩展到群集中的多个计算机时,带有数据的静态变量将无法与Spark一起使用(看起来它不起作用,b/c Spark正在local(2)模式下运行.

静态字段是对象初始化的一部分,并且不是序列化形式的一部分,因为它们可以/应该在远程操作中在接收侧重建.请注意,如果对象足够智能以在反序列化时重建其内容,它可以工作.

相反,我们可以使用可以序列化的普通实例.(喜欢mydata = new HashMap<>(); mydata.put(...)

2.1)实例变量与广播变量

假设我们有一个大型数据集,有420个分区和8个执行器节点的集群.在如下的操作中:

val referenceData = Map(...)
val filtered = rdd.filter(elem => referenceData.get(elem) > 10)
Run Code Online (Sandbox Code Playgroud)

referenceData对象将被序列化420次,或者执行转换所需的任务数量.

而是一个广播变量:

val referenceDataBC = sparkContext.broadcast(Map(...))
val filtered = rdd.filter(elem => referenceDataBC.value.get(elem) > 10)
Run Code Online (Sandbox Code Playgroud)

将被送到每个遗嘱执行人一次,或总共8次.因此,通过减少序列化开销来节省大量网络和CPU.

3)在SparkStreaming计算开始之前加载文件内容的某个地方是正确/优雅的过程吗?

我们需要在流式处理开始之前加载外部数据.我们还有其他选择吗?

4)以静态方式使用函数或在rdd.foreachPartition(....)中实例化类

取决于函数是否需要类提供的上下文.例如需要背景:

rdd.foreachPartition{ iter =>
    val jsonParser = new JsonParser(validation)
    val parsed = iter.map(jsonParser.parse)
    ...
}
Run Code Online (Sandbox Code Playgroud)

例如,不需要上下文

vectors.foreachPartition{ iter =>
    val magnitudes = iter.map(elem => MyVectorMath.modulus(elem))
}
Run Code Online (Sandbox Code Playgroud)