Rai*_*eld 6 partitioning apache-spark apache-spark-sql spark-dataframe apache-spark-dataset
我需要join基于一些共享键列来组合许多DataFrame.对于键值RDD,可以指定分区器,以便将具有相同键的数据点混洗到同一个执行器,因此加入更有效(如果在之前有一个shuffle相关操作join).可以在Spark DataFrames或DataSet上完成同样的事情吗?
Sil*_*vio 12
repartition如果你知道你将多次加入DataFrame,你可以在加载后加载它
val users = spark.read.load("/path/to/users").repartition('userId)
val joined1 = users.join(addresses, "userId")
joined1.show() // <-- 1st shuffle for repartition
val joined2 = users.join(salary, "userId")
joined2.show() // <-- skips shuffle for users since it's already been repartitioned
Run Code Online (Sandbox Code Playgroud)
因此,它会将数据混洗一次,然后在加入后续时间时重复使用随机播放文件.
但是,如果您知道您将反复对某些键上的数据进行混洗,那么最好的办法是将数据保存为分块表.这将把数据写入预先散列分区的数据,因此当您读取表并加入它们时,您可以避免随机播放.你可以这样做:
// you need to pick a number of buckets that makes sense for your data
users.bucketBy(50, "userId").saveAsTable("users")
addresses.bucketBy(50, "userId").saveAsTable("addresses")
val users = spark.read.table("users")
val addresses = spark.read.table("addresses")
val joined = users.join(addresses, "userId")
joined.show() // <-- no shuffle since tables are co-partitioned
Run Code Online (Sandbox Code Playgroud)
为了避免混乱,桌子必须使用相同的桶(例如相同数量的桶并且在桶柱上连接).
使用该repartition方法可以使用 DataFrame/DataSet API 。使用这种方法,您可以指定一个或多个列用于数据分区,例如
val df2 = df.repartition($"colA", $"colB")
Run Code Online (Sandbox Code Playgroud)
也可以在同一命令中同时指定所需分区的数量,
val df2 = df.repartition(10, $"colA", $"colB")
Run Code Online (Sandbox Code Playgroud)
注意:这并不能保证数据帧的分区将位于同一节点上,只是分区以相同的方式完成。
| 归档时间: |
|
| 查看次数: |
11002 次 |
| 最近记录: |