左连接操作永远运行

Mar*_*kus 0 scala join apache-spark apache-spark-sql

我必须应用左连接来连接我想要连接的数据帧。

df1 =

+----------+---------------+
|product_PK| rec_product_PK|
+----------+---------------+
|       560|            630|
|       710|            240|
|       610|            240|
Run Code Online (Sandbox Code Playgroud)

df2=

+----------+---------------+-----+
|product_PK| rec_product_PK| rank|
+----------+---------------+-----+
|       560|            610|    1|
|       560|            240|    1|
|       610|            240|    0|
Run Code Online (Sandbox Code Playgroud)

问题是df1只包含 500 行,而df2包含 600.000.000 行和 24 个分区。我的左连接需要一段时间才能执行。我等了5个小时还没完。

val result = df1.join(df2,Seq("product_PK","rec_product_PK"),"left")
Run Code Online (Sandbox Code Playgroud)

结果应包含 500 行。我使用以下参数从 Spark-shell 执行代码:

spark-shell -driver-memory 10G --driver-cores 4 --executor-memory 10G --num-executors 2 --executor-cores 4
Run Code Online (Sandbox Code Playgroud)

我怎样才能加快这个过程?

更新:

的输出df2.explain(true)

val result = df1.join(df2,Seq("product_PK","rec_product_PK"),"left")
Run Code Online (Sandbox Code Playgroud)

Ass*_*son 5

您可能应该使用不同类型的联接。默认情况下,您进行的联接假设两个数据帧都很大,因此会进行大量的洗牌(通常会对每一行进行哈希处理,数据将根据哈希进行洗牌,然后将完成每个执行程序联接)。您可以通过在结果上键入 usingexplain 来查看执行计划。

相反,请考虑使用广播提示:

val result = df2.join(broadcast(df1),Seq("product_PK","rec_product_PK"),"right")
Run Code Online (Sandbox Code Playgroud)

请注意,我翻转了连接顺序,以便广播出现在连接参数中。广播函数是 org.apache.spark.sql.functions 的一部分

这将执行广播连接,df1 将被复制到所有执行器,并且连接将在本地完成,从而避免需要对大型 df2 进行洗牌。