Mar*_*kus 0 scala join apache-spark apache-spark-sql
我必须应用左连接来连接我想要连接的数据帧。
df1 =
+----------+---------------+
|product_PK| rec_product_PK|
+----------+---------------+
| 560| 630|
| 710| 240|
| 610| 240|
Run Code Online (Sandbox Code Playgroud)
df2=
+----------+---------------+-----+
|product_PK| rec_product_PK| rank|
+----------+---------------+-----+
| 560| 610| 1|
| 560| 240| 1|
| 610| 240| 0|
Run Code Online (Sandbox Code Playgroud)
问题是df1只包含 500 行,而df2包含 600.000.000 行和 24 个分区。我的左连接需要一段时间才能执行。我等了5个小时还没完。
val result = df1.join(df2,Seq("product_PK","rec_product_PK"),"left")
Run Code Online (Sandbox Code Playgroud)
结果应包含 500 行。我使用以下参数从 Spark-shell 执行代码:
spark-shell -driver-memory 10G --driver-cores 4 --executor-memory 10G --num-executors 2 --executor-cores 4
Run Code Online (Sandbox Code Playgroud)
我怎样才能加快这个过程?
更新:
的输出df2.explain(true):
val result = df1.join(df2,Seq("product_PK","rec_product_PK"),"left")
Run Code Online (Sandbox Code Playgroud)
您可能应该使用不同类型的联接。默认情况下,您进行的联接假设两个数据帧都很大,因此会进行大量的洗牌(通常会对每一行进行哈希处理,数据将根据哈希进行洗牌,然后将完成每个执行程序联接)。您可以通过在结果上键入 usingexplain 来查看执行计划。
相反,请考虑使用广播提示:
val result = df2.join(broadcast(df1),Seq("product_PK","rec_product_PK"),"right")
Run Code Online (Sandbox Code Playgroud)
请注意,我翻转了连接顺序,以便广播出现在连接参数中。广播函数是 org.apache.spark.sql.functions 的一部分
这将执行广播连接,df1 将被复制到所有执行器,并且连接将在本地完成,从而避免需要对大型 df2 进行洗牌。
| 归档时间: |
|
| 查看次数: |
3403 次 |
| 最近记录: |