Spark:如何使用 crossJoin

Mpi*_*ris 2 scala apache-spark

我有两个数据框。df1有 100000 行,df2有 10000 行。我想创建一个df3是两者的交叉连接:

val df3 = df1.crossJoin(df2)
Run Code Online (Sandbox Code Playgroud)

这将产生 10 亿行。试图在本地运行它,但似乎需要永远。你觉得本地可以做吗?

如果不是,哪种配置可以优化在云上运行它的时间?

gee*_*015 7

首先,我同意 – Mpizos Dimitris 您应该尝试提供所有可能的细节以获得最佳解决方案。

但下面是一种可能但可能不是有效方法。

1.缓存并重新分区小DataFrame df2 。确保通过重新分区,数据均匀分布在所有工作人员中,以便您可以使用尽可能多的任务。假设您的集群有 20 个工作线程,每个工作线程有 4 个内核。因此,您总共需要 4 x 20 = 80 个分区。

df2.repartition(80).cache()
Run Code Online (Sandbox Code Playgroud)

2.在DF2上执行一个动作,让缓存发生在作业开始之前。检查SparkUI的存储选项卡,确保产品DF缓存已经分布在所有节点上。

df2.show(10)
Run Code Online (Sandbox Code Playgroud)

3.暂时禁用广播加入。广播连接不适用于笛卡尔积,因为工作人员获得了太多的广播数据,他们陷入无限的垃圾收集循环并且永远无法完成。请记住在查询完成后重新打开它。您可以设置以下配置以禁用 BC 加入。

spark.sql.autoBroadcastJoinThreshold = 0
Run Code Online (Sandbox Code Playgroud)

4. 不使用连接条件将 DF1 与 DF2 连接。

val crossJoined = df1.join(df2)
Run Code Online (Sandbox Code Playgroud)

5.在执行之前在DataFrame上运行一个解释计划以确认你有一个笛卡尔乘积操作。

crossJoined.explain
Run Code Online (Sandbox Code Playgroud)

  • `spark.sql.autoBroadcastJoinThreshold = 0` 对于 Spark 2.*,您需要进行如下设置。`spark.conf.set("spark.sql.partitions.autoBroadcastJoinThreshold", -1)` [src](https://spark.apache.org/docs/latest/sql-performance-tuning.html#broadcast-hint -for-sql-查询) (2认同)