d12*_*25q 5 scala apache-spark apache-spark-sql google-cloud-dataproc apache-spark-dataset
请考虑以下代码:
case class Person(
personId: Long, name: String, ageGroup: String, gender: String,
relationshipStatus: String, country: String, state: String
)
case class PerPersonPower(personId: Long, power: Double)
val people: Dataset[Person] = ... // Around 50 million entries.
val powers: Dataset[PerPersonPower] = ... // Around 50 million entries.
people.join(powers, "personId")
.groupBy("ageGroup", "gender", "relationshipStatus", "country", "state")
.agg(
sum("power").alias("totalPower"),
count("*").alias("personCount")
)
Run Code Online (Sandbox Code Playgroud)
它在具有大约100 GB RAM的群集上执行.但是,群集内存不足.我不知道该怎么做.实际上,people被分区$"personId"和缓存 - people.repartition($"personId").cache().
我有什么想法可以优化这个计算?
该集群是一个普通的Google Dataproc集群---因此它在客户端模式下使用YARN--由14个节点组成,每个节点具有8 GB RAM.
根据请求中可用的有限信息,我建议不要使用缓存并创建比默认数量多一点的分区(通常为 200,但每个集群可能有所不同) - 尝试在spark.shuffle.partitions应用程序中将其设置为 1000 或 2000 以启动和。它可以像这样完成spark.conf.set('spark.shuffle.partitions', 1000)。最有可能的是,您的查询命中了 SortMergeJoin,并且当前执行器获取的数据多于堆减去 YARN 开销的数据。请咨询您的SparkUI 以获取集群信息,以便监控和优化查询执行。在 SQL 选项卡中,您将看到有关每个阶段正在处理的数据量的非常详细的数字,因此您将识别瓶颈并更快地修复它们。
Spark 查询规划器将首先按中定义的 number 中的 personIdPerPersonPower进行排序,将其刷新到 HDFS 到单独的 parquet 文件中,然后创建相同数量的部分聚合并将它们放入生成的数据帧中。Personspark.shuffle.partitionsspark.shuffle.partitions
看来您正在加入大约 18-20GB(人)的数据和大约 800MB(电量)。如果功率小一点,您可以尝试使用BroadcastHashJoin之类的people.join(broadcast(powers), "personId"),但我不建议广播大于 128Mb 或 256Mb 的数据帧。
祝你好运!
| 归档时间: |
|
| 查看次数: |
536 次 |
| 最近记录: |