Dan*_*elM 11 apache-spark apache-spark-sql
考虑以下运行GROUP BY具有相对大量聚合和相对大量组的a的示例:
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext._
val h = new HiveContext(sc)
import h.implicits._
val num_columns = 3e3.toInt
val num_rows = 1e6.toInt
val num_groups = 1e5.toInt
case class Data(A: Long = (math.random*num_groups).toLong)
val table = (1 to num_rows).map(i => Data()).toDF
val aggregations = (1 to num_columns).map(i => s"count(1) as agg_$i")
table.registerTempTable("table")
val result = h.sql(s"select a, ${aggregations.mkString(",")} from table group by a")
// Write the result to make sure everyting is executed
result.save(s"result_${num_columns}_${num_rows}_${num_groups}.parquet", "parquet")
Run Code Online (Sandbox Code Playgroud)
这个作业的输入只有8MB,输出大约2.4GB,我在一个集群上运行它,每个工作机器有61GB内存.结果:所有工作程序都因OutOfMemory异常而崩溃.num_columns由于GC开销,即使工作值较低也会变得非常慢.
我们尝试的事情包括:
有没有更好的方法来达到预期的效果?
一般来说,解决此类问题的几乎通用方法是将分区大小保持在合理的大小。虽然“合理”有点主观,并且可能因情况而异,但 100-200MB 看起来是一个不错的起点。
我可以轻松聚合您在单个工作线程上提供的示例数据,保持默认值spark.executor.memory(1GB) 并将总可用资源限制为 8 个内核和 8GB RAM。所有这些都是通过使用 50 个分区并将聚合时间保持在 3 秒左右而无需任何特殊调整(这在 1.5.2 到 2.0.0 之间或多或少是一致的)。
总结一下:如果可能的话spark.default.parallelism,在创建时增加或明确设置分区数量DataFrame。对于像这样的小型数据集来说,默认值spark.sql.shuffle.partitions应该足够了。
| 归档时间: |
|
| 查看次数: |
1983 次 |
| 最近记录: |