用于有效连接Spark数据帧/数据集的分区数据

Rai*_*eld 6 partitioning apache-spark apache-spark-sql spark-dataframe apache-spark-dataset

我需要join基于一些共享键列来组合许多DataFrame.对于键值RDD,可以指定分区器,以便将具有相同键的数据点混洗到同一个执行器,因此加入更有效(如果在之前有一个shuffle相关操作join).可以在Spark DataFrames或DataSet上完成同样的事情吗?

Sil*_*vio 12

repartition如果你知道你将多次加入DataFrame,你可以在加载后加载它

val users = spark.read.load("/path/to/users").repartition('userId)

val joined1 = users.join(addresses, "userId")
joined1.show() // <-- 1st shuffle for repartition

val joined2 = users.join(salary, "userId")
joined2.show() // <-- skips shuffle for users since it's already been repartitioned
Run Code Online (Sandbox Code Playgroud)

因此,它会将数据混洗一次,然后在加入后续时间时重复使用随机播放文件.

但是,如果您知道您将反复对某些键上的数据进行混洗,那么最好的办法是将数据保存为分块表.这将把数据写入预先散列分区的数据,因此当您读取表并加入它们时,您可以避免随机播放.你可以这样做:

// you need to pick a number of buckets that makes sense for your data
users.bucketBy(50, "userId").saveAsTable("users")
addresses.bucketBy(50, "userId").saveAsTable("addresses")

val users = spark.read.table("users")
val addresses = spark.read.table("addresses")

val joined = users.join(addresses, "userId")
joined.show() // <-- no shuffle since tables are co-partitioned
Run Code Online (Sandbox Code Playgroud)

为了避免混乱,桌子必须使用相同的桶(例如相同数量的桶并且在桶柱上连接).

  • 我试图理解为什么 Spark 要优化第二个作业,需要通过 userId 显式重新分区。在第一个作业(需要洗牌)之后,Spark 是否会知道数据现在已按用户 ID 进行分区? (2认同)

Sha*_*ica 5

使用该repartition方法可以使用 DataFrame/DataSet API 。使用这种方法,您可以指定一个或多个列用于数据分区,例如

val df2 = df.repartition($"colA", $"colB")
Run Code Online (Sandbox Code Playgroud)

也可以在同一命令中同时指定所需分区的数量,

val df2 = df.repartition(10, $"colA", $"colB")
Run Code Online (Sandbox Code Playgroud)

注意:这并不能保证数据帧的分区将位于同一节点上,只是分区以相同的方式完成。