spark.sql.shuffle.partitions和spark.default.parallelism有什么区别?

Edi*_*son 54 performance hadoop bigdata apache-spark apache-spark-sql

spark.sql.shuffle.partitions和之间有什么区别spark.default.parallelism

我试图将它们都设置为SparkSQL,但第二阶段的任务编号始终为200.

Sha*_*ica 67

这里的答案,spark.sql.shuffle.partitions配置为连接或聚合的数据混洗时使用的分区数.

spark.default.parallelism是分区的默认数量RDDS按喜欢变换返回join,reduceByKey以及parallelize不能由用户明确设置时.请注意,spark.default.parallelism似乎只适用于raw RDD,在使用数据帧时会被忽略.

如果您正在执行的任务不是连接或聚合,并且您正在使用数据框,那么设置这些将不会产生任何影响.但是,您可以通过在代码中调用df.repartition(numOfPartitions)(不要忘记将其分配给新分区)来自行设置分区数val.


要更改代码中的设置,您只需执行以下操作:

sqlContext.setConf("spark.sql.shuffle.partitions", "300")
sqlContext.setConf("spark.default.parallelism", "300")
Run Code Online (Sandbox Code Playgroud)

或者,您可以在将作业提交到群集时进行更改spark-submit:

./bin/spark-submit --conf spark.sql.shuffle.partitions=300 --conf spark.default.parallelism=300
Run Code Online (Sandbox Code Playgroud)

  • 关于这个数字应该是多少的任何建议......? (7认同)
  • 这里有点困惑。Spark.sql.shuffle.partitions 配置用于连接或聚合的分区。然后你说spark.default.parallelism用于像join、reduceByKey这样的转换。这些不也是连接或聚合吗? (4认同)

Ami*_*wal 15

spark.default.parallelism是 spark 设置的默认分区数,默认为 200。如果你想增加分区数,你可以应用属性spark.sql.shuffle.partitions来设置分区数spark 配置或在运行 spark SQL 时。

通常这个spark.sql.shuffle.partitions当我们有内存拥塞并且我们看到以下错误时会使用它:spark error:java.lang.IllegalArgumentException: Size exceeded Integer.MAX_VALUE

所以设置您可以为每个分区分配一个分区为 256 MB,您可以使用它来设置您的进程。

此外,如果分区数接近 2000,则将其增加到 2000 以上。由于 spark 为分区 < 2000 和 > 2000 应用不同的逻辑,这将通过减少内存占用来提高代码性能,因为如果 >2000,数据默认值会被高度压缩。

  • 你好。有点晚了,但是当分区数量超过 2000 时,您是否有关于 Spark (2.1.0 如果可能的话:D)的不同行为的任何来源?我找不到任何东西。 (2认同)

Koe*_*dlt 13

添加一些已经发布的精彩答案:

概括

  • spark.sql.shuffle.partitions
    • 确定默认情况下对数据帧/数据集进行广泛转换后将拥有多少个输出分区。
    • 其默认值为 200。
  • spark.default.parallelism
    • 是一个更复杂的参数,在 Spark 中更“深入”。它影响:
      • 对RDD进行广泛转换后,您将拥有多少个分区如果您未指定数量,则
      • sc.parallelize创建多少个分区
      • 执行时读入了多少个分区spark.read.csv,...
    • 它的默认值取决于您对哪种类型的集群执行哪种类型的操作。

更详细一点

spark.sql.shuffle.partitions

来自文档

为连接或聚合而打乱数据时使用的默认分区数。注意:对于结构化流,无法在从同一检查点位置重新启动查询之间更改此配置。

从 Spark 3.3.1(本文发布时的最新版本)中可以看出SQLConf.scalaspark.sql.shuffle.partitions默认值为 200。

val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions")
  .doc("The default number of partitions to use when shuffling data for joins or aggregations. " +
    "Note: For structured streaming, this configuration cannot be changed between query " +
    "restarts from the same checkpoint location.")
  .version("1.1.0")
  .intConf
  .checkValue(_ > 0, "The value of spark.sql.shuffle.partitions must be positive")
  .createWithDefault(200)
Run Code Online (Sandbox Code Playgroud)

结论:这是一个可以对Dataframes/Datasets上的广泛转换(连接、排序等)产生直接影响的值。您将能够使用此参数配置这些宽转换的输出分区的数量。

spark.default.parallelism

来自文档

当用户未设置时,连接、reduceByKey 和并行化等转换返回的 RDD 中的默认分区数。

关于其默认值:

对于分布式洗牌操作(如reduceByKey和 )join,父 RDD 中的最大分区数。对于parallelize没有父 RDD 的操作,它取决于集群管理器:

  • 本地模式:本地机器的核心数
  • Mesos 细粒度模式:8
  • 其他:所有执行器节点上的核心总数或2,以较大者为准

所以我们已经看到这个参数有点复杂。它没有真正的默认值,但您可以设置它。如果我们看一些代码,这一点就会变得更清楚。

在 中SparkContext.scala,我们看到如何defaultParallelism定义:

/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
def defaultParallelism: Int = {
  assertNotStopped()
  taskScheduler.defaultParallelism
}
Run Code Online (Sandbox Code Playgroud)

所以我们看到这defaultParallelism取决于 taskScheduler 的类型(如文档状态)。让我们看看这些:

override def defaultParallelism(): Int =
  scheduler.conf.getInt("spark.default.parallelism", totalCores)
Run Code Online (Sandbox Code Playgroud)
override def defaultParallelism(): Int = sc.conf.getInt("spark.default.parallelism", 8)
Run Code Online (Sandbox Code Playgroud)
  • 其他的CoarseGrainSchedulerBackend):
override def defaultParallelism(): Int = {
  conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}
Run Code Online (Sandbox Code Playgroud)

好的,现在我们知道这个值有点复杂,让我们尝试找出它何时相关:

  • 在对RDD进行广泛转换时。joinreduceByKeygroupByKey等都使用defaultPartitionerif 没有给出分区作为输入参数。其中defaultPartitioner,spark.default.parallelism用于确定分区的数量。
  • sc.parallelize调用Seq. 根据您的集群管理器,您将获得多个输出分区,如上所述。
  • 读入数据时(例如spark.read.csv),会对读入多少个分区产生影响。在DataSourceScanExeccreateReadRDD函数中,读入的输出分区数量受函数影响maxSplitBytes,而函数本身又受正如这个 SO 答案spark.default.parallelism中所解释的
  • 我确信它被用在更多的地方,但我希望这已经给出了关于这个参数的更多直觉。