Tw *_*Nus 15 apache-spark apache-spark-sql pyspark pyspark-sql
我的数据原则上是一个表,除了其他"数据"之外,它还包含一列ID和一列GROUP_ID.
在第一步中,我将CSV读入Spark,进行一些处理以准备第二步的数据,并将数据写为镶木地板.第二步做了很多的groupBy('GROUP_ID')和Window.partitionBy('GROUP_ID').orderBy('ID').
现在的目标是 - 为了避免在第二步骤洗牌 - 在第一步骤中有效地加载数据,因为这是一定时器.
问题第1部分: AFAIK,Spark在从镶木地板加载时保留了分区(这实际上是任何"优化写入考虑"的基础) - 对吗?
我提出了三种可能性:
df.orderBy('ID').write.partitionBy('TRIP_ID').parquet('/path/to/parquet')df.orderBy('ID').repartition(n, 'TRIP_ID').write.parquet('/path/to/parquet')df.repartition(n, 'TRIP_ID').sortWithinPartitions('ID').write.parquet('/path/to/parquet')我会设置n个别镶木地板文件大约100MB.
问第2部分:它是正确的,这三个选项的目标(避免在第2步洗牌)方面产生"相同" /类似的结果?如果没有,有什么区别?哪一个'更好'?
问题第3部分:关于步骤1,三个选项中哪一个表现更好?
感谢您分享您的知识!
编辑2017-07-24
在做了一些测试(写入和读取镶木地板)后,似乎Spark 在第二步中默认无法恢复partitionBy和orderBy信息.分区的数量(从df.rdd.getNumPartitions()似乎由核心数量和/或spark.default.parallelism(如果设置)确定,但不是由镶木地板分区的数量决定.因此问题1的答案是错误的,问题2和3将是无关紧要.
因此,原来的真正的问题是:有没有办法告诉星火,该数据已经被列分区X和列进行排序ÿ?
据我所知,没有办法从 parquet 读取数据并告诉 Spark 它已经按某种表达式分区并排序。
简而言之,HDFS 等上的一个文件对于一个 Spark 分区来说太大了。即使您将整个文件读取到一个分区,并使用 Parquet 属性(例如等) parquet.split.files=false,parquet.task.side.metadata=true与仅进行一次洗牌相比,成本也会最高。
| 归档时间: |
|
| 查看次数: |
1734 次 |
| 最近记录: |