什么时候在Apache Spark中发生改组?

cnn*_*znn 7 mapreduce apache-spark

我正在优化Spark中的参数,并且想知道Spark是如何改组数据的.

确切地说,我有一个简单的字数统计程序,并想知道spark.shuffle.file.buffer.kb如何影响运行时间.现在,当我将此参数设置得非常高时,我只看到减速(我猜这会阻止每个任务的缓冲区同时适应内存).

有人可以解释Spark是如何进行减少的吗?例如,在RDD中读取和分区数据,并且当调用"action"函数时,Spark会将任务发送到工作节点.如果操作是减少,Spark如何处理此问题,以及与此过程相关的shuffle文件/缓冲区如何?

eli*_*sah 16

问题:关于何时在Spark上触发洗牌的问题

答案:任何join,cogroupByKey操作涉及将对象保存在哈希映射或内存缓冲区中以进行分组或排序.join,cogroupgroupByKey在这些任务中使用这些数据结构,用于它们触发的混洗的提取侧的阶段.reduceByKeyaggregateByKey在他们触发的洗牌两侧的阶段的任务中使用数据结构.

说明:如何在Spark中进行shuffle操作?

与Hadoop相比,Spark中的shuffle操作实现方式不同.我不知道你是否熟悉它如何与Hadoop一起使用,但现在让我们专注于Spark.

地图方面,Spark中的每个map任务都为每个reducer写出一个shuffle文件(os磁盘缓冲区) - 它对应于Spark中的逻辑块.这些文件不是中介,因为Spark不会将它们合并到更大的分区文件中.由于Spark中的调度开销较小,因此mappers(M)和reducers(R)的数量远远高于Hadoop.因此,将M*R文件运送到相应的减速器可能导致显着的开销.

与Hadoop类似,Spark还提供了一个参数spark.shuffle.compress来指定压缩映射输出的压缩库.在这种情况下,它可能是Snappy(默认情况下)或LZF.Snappy每个打开的文件只使用33KB的缓冲区,并显着降低了遇到内存不足错误的风险.

reduce方面,Spark要求所有混洗数据都适合相应reducer任务的内存,相反Hadoop可以选择将其溢出到磁盘上.例如,这当然只会在reducer任务需要一个GroupByKey或一个ReduceByKey操作的所有混洗数据的情况下发生.在这种情况下,Spark会抛出一个内存不足的异常,这对于开发人员来说已经证明是一个挑战.

同样使用Spark,没有重叠的复制阶段,不像Hadoop具有重叠的复制阶段,即使在映射完成之前,映射器也会将数据推送到reducer.这意味着与Hadoop中的推送操作相比,shuffle是Spark中的pull操作.每个reducer还应该维护一个网络缓冲区来获取map输出.此缓冲区的大小通过参数指定(默认情况下为48MB).spark.reducer.maxMbInFlight

有关在Apache Spark中进行混洗的更多信息,我建议您阅读以下内容: