Pat*_*oin 41 apache-spark apache-spark-sql
我想修复/合并我的数据,以便将其保存到每个分区的一个Parquet文件中.我还想使用Spark SQL partitionBy API.所以我可以这样做:
df.coalesce(1).write.partitionBy("entity", "year", "month", "day", "status")
.mode(SaveMode.Append).parquet(s"$location")
Run Code Online (Sandbox Code Playgroud)
我已经测试了这个并且它似乎表现不佳.这是因为在数据集中只有一个分区可以处理,文件的所有分区,压缩和保存都必须由一个CPU内核完成.
在调用coalesce之前,我可以重写这个来手动执行分区(使用带有不同分区值的过滤器).
但是使用标准的Spark SQL API有更好的方法吗?
mor*_*ada 79
我有完全相同的问题,我找到了一种方法来使用它DataFrame.repartition().使用的问题coalesce(1)是您的并行性降至1,并且它最多可能很慢并且最坏时出错.增加这个数字也无济于事 - 如果你coalesce(10)得到更多的并行性,但最终每个分区有10个文件.
要在不使用的情况下为每个分区获取一个文件coalesce(),请使用repartition()您想要对输出进行分区的相同列.所以在你的情况下,这样做:
import spark.implicits._
df.repartition($"entity", $"year", $"month", $"day", $"status").write.partitionBy("entity", "year", "month", "day", "status").mode(SaveMode.Append).parquet(s"$location")
Run Code Online (Sandbox Code Playgroud)
一旦我这样做,我得到每个输出分区一个镶木地板文件,而不是多个文件.
我在Python中对此进行了测试,但我认为在Scala中它应该是相同的.
eli*_*sah 10
根据定义:
coalesce(numPartitions:Int):DataFrame返回一个具有正确numPartitions分区的新DataFrame.
您可以使用它来使用numPartitions参数减少RDD/DataFrame中的分区数.在过滤大型数据集后,它可以更有效地运行操作.
关于你的代码,它表现不佳,因为你实际做的是:
将所有内容放入1个分区,这会使驱动程序重载,因为它会将所有数据拉入驱动程序的1个分区(这也不是一个好习惯)
coalesce 实际上洗牌网络上的所有数据也可能导致性能下降.
随机播放是Spark的重新分发数据的机制,因此它可以跨分区进行不同的分组.这通常涉及跨执行程序和机器复制数据,使得混洗成为复杂且昂贵的操作.
该洗牌的概念是非常重要的管理和理解.由于涉及磁盘I/O,数据序列化和网络I/O,因此它是一项昂贵的操作,因此总是最好将最小化进行洗牌.为了组织shuffle的数据,Spark生成了一系列任务 - 映射任务以组织数据,以及一组reduce任务来聚合它.这个术语来自MapReduce,并不直接与Spark的地图和减少操作相关.
在内部,各个地图任务的结果会保留在内存中,直到它们无法适应.然后,这些基于目标分区进行排序并写入单个文件.在reduce方面,任务读取相关的排序块.
关于分区镶木地板,我建议你阅读的答案在这里大约有木地板分区星火DataFrames也是本节星火编程指南中的性能优化.
我希望这有帮助 !
| 归档时间: |
|
| 查看次数: |
42906 次 |
| 最近记录: |