spark.sql.shuffle.partitions的最佳值应该是什么,或者在使用Spark SQL时如何增加分区?

Ume*_*cha 36 apache-spark apache-spark-sql

嗨,我实际上使用Spark SQL hiveContext.sql(),它使用查询组,我遇到了OOM问题.因此,考虑将spark.sql.shuffle.partitions200的默认值增加到1000,但它没有帮助.请纠正我,如果我错了,这个分区将共享数据shuffle load,所以分区更少数据保持.请指导我是Spark新手.我正在使用Spark 1.4.0,我有大约1TB的未压缩数据,可以使用hiveContext.sql()group by queries 进行处理.

non*_*ont 49

如果shuffle上的内存不足,请尝试设置spark.sql.shuffle.partitions为2001.

当分区数大于2000时,Spark使用不同的数据结构进行随机记录保存:

private[spark] object MapStatus {

  def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
    if (uncompressedSizes.length > 2000) {
      HighlyCompressedMapStatus(loc, uncompressedSizes)
    } else {
      new CompressedMapStatus(loc, uncompressedSizes)
    }
  }
...
Run Code Online (Sandbox Code Playgroud)

我真的希望他们能让你独立配置它.

顺便说一下,我在Cloudera幻灯片中找到了这些信息.

  • 在 Spark 2.4.x 之前,此信息是准确的。请查看spark.shuffle.minNumPartitionsToHighlycompress 要点是,您可以使用任意数量的分区来使用HighlyCompressedMapStatus,这让您可以毫不妥协地选择最佳并行度级别 (3认同)

sam*_*est 9

好的,所以我认为你的问题更为笼统.它不是特定于Spark SQL,它是Spark的一个普遍问题,它忽略了文件很少时告诉它的分区数量.除非你打电话,否则Spark似乎与HDFS上的文件数有相同的分区数repartition.所以打电话repartition应该工作,但有一点需要引起洗牌.

我刚才提出这个问题,还没有得到一个好的答案:(

Spark:增加分区数量而不会导致shuffle?