Edi*_*son 54 performance hadoop bigdata apache-spark apache-spark-sql
spark.sql.shuffle.partitions和之间有什么区别spark.default.parallelism?
我试图将它们都设置为SparkSQL,但第二阶段的任务编号始终为200.
Sha*_*ica 67
从这里的答案,spark.sql.shuffle.partitions配置为连接或聚合的数据混洗时使用的分区数.
spark.default.parallelism是分区的默认数量RDDS按喜欢变换返回join,reduceByKey以及parallelize不能由用户明确设置时.请注意,spark.default.parallelism似乎只适用于raw RDD,在使用数据帧时会被忽略.
如果您正在执行的任务不是连接或聚合,并且您正在使用数据框,那么设置这些将不会产生任何影响.但是,您可以通过在代码中调用df.repartition(numOfPartitions)(不要忘记将其分配给新分区)来自行设置分区数val.
要更改代码中的设置,您只需执行以下操作:
sqlContext.setConf("spark.sql.shuffle.partitions", "300")
sqlContext.setConf("spark.default.parallelism", "300")
Run Code Online (Sandbox Code Playgroud)
或者,您可以在将作业提交到群集时进行更改spark-submit:
./bin/spark-submit --conf spark.sql.shuffle.partitions=300 --conf spark.default.parallelism=300
Run Code Online (Sandbox Code Playgroud)
Ami*_*wal 15
spark.default.parallelism是 spark 设置的默认分区数,默认为 200。如果你想增加分区数,你可以应用属性spark.sql.shuffle.partitions来设置分区数spark 配置或在运行 spark SQL 时。
通常这个spark.sql.shuffle.partitions当我们有内存拥塞并且我们看到以下错误时会使用它:spark error:java.lang.IllegalArgumentException: Size exceeded Integer.MAX_VALUE
所以设置您可以为每个分区分配一个分区为 256 MB,您可以使用它来设置您的进程。
此外,如果分区数接近 2000,则将其增加到 2000 以上。由于 spark 为分区 < 2000 和 > 2000 应用不同的逻辑,这将通过减少内存占用来提高代码性能,因为如果 >2000,数据默认值会被高度压缩。
Koe*_*dlt 13
添加一些已经发布的精彩答案:
spark.sql.shuffle.partitions:
spark.default.parallelism:
sc.parallelize创建多少个分区spark.read.csv,...spark.sql.shuffle.partitions来自文档:
为连接或聚合而打乱数据时使用的默认分区数。注意:对于结构化流,无法在从同一检查点位置重新启动查询之间更改此配置。
从 Spark 3.3.1(本文发布时的最新版本)中可以看出SQLConf.scala,spark.sql.shuffle.partitions默认值为 200。
val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions")
.doc("The default number of partitions to use when shuffling data for joins or aggregations. " +
"Note: For structured streaming, this configuration cannot be changed between query " +
"restarts from the same checkpoint location.")
.version("1.1.0")
.intConf
.checkValue(_ > 0, "The value of spark.sql.shuffle.partitions must be positive")
.createWithDefault(200)
Run Code Online (Sandbox Code Playgroud)
结论:这是一个可以对Dataframes/Datasets上的广泛转换(连接、排序等)产生直接影响的值。您将能够使用此参数配置这些宽转换的输出分区的数量。
spark.default.parallelism来自文档:
当用户未设置时,连接、reduceByKey 和并行化等转换返回的 RDD 中的默认分区数。
关于其默认值:
对于分布式洗牌操作(如
reduceByKey和 )join,父 RDD 中的最大分区数。对于parallelize没有父 RDD 的操作,它取决于集群管理器:
- 本地模式:本地机器的核心数
- Mesos 细粒度模式:8
- 其他:所有执行器节点上的核心总数或2,以较大者为准
所以我们已经看到这个参数有点复杂。它没有真正的默认值,但您可以设置它。如果我们看一些代码,这一点就会变得更清楚。
在 中SparkContext.scala,我们看到如何defaultParallelism定义:
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
def defaultParallelism: Int = {
assertNotStopped()
taskScheduler.defaultParallelism
}
Run Code Online (Sandbox Code Playgroud)
所以我们看到这defaultParallelism取决于 taskScheduler 的类型(如文档状态)。让我们看看这些:
override def defaultParallelism(): Int =
scheduler.conf.getInt("spark.default.parallelism", totalCores)
Run Code Online (Sandbox Code Playgroud)
override def defaultParallelism(): Int = sc.conf.getInt("spark.default.parallelism", 8)
Run Code Online (Sandbox Code Playgroud)
CoarseGrainSchedulerBackend):override def defaultParallelism(): Int = {
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}
Run Code Online (Sandbox Code Playgroud)
好的,现在我们知道这个值有点复杂,让我们尝试找出它何时相关:
defaultPartitionerif 没有给出分区作为输入参数。其中defaultPartitioner,spark.default.parallelism用于确定分区的数量。sc.parallelize调用Seq. 根据您的集群管理器,您将获得多个输出分区,如上所述。spark.read.csv),会对读入多少个分区产生影响。在DataSourceScanExec的createReadRDD函数中,读入的输出分区数量受函数影响maxSplitBytes,而函数本身又受正如这个 SO 答案spark.default.parallelism中所解释的。| 归档时间: |
|
| 查看次数: |
58923 次 |
| 最近记录: |