为什么 Spark 对大小大于 autoBroadcastJoinThreshold 的文件应用广播连接?

Sum*_*t G 3 apache-spark apache-spark-sql

我正在使用 Spark 3.1.1 并分别加入文件大小为 8.6Gb 和 25.2Mb 的两个 Dataframe,并且不应用任何过滤器。Spark 会自动使用 BroadcastHashJoin 来实现此目的,尽管spark.sql.autoBroadcastJoinThreshold默认值为 10Mb。

如何在不应用任何过滤器的情况下将 25.2Mb 转换为 8.1Mb 以获得广播资格?

val df1 = spark.read
  .option("header",true)
  .csv("s3a://data/staging/received/data/spark/3/KernelVersionOutputFiles.csv")
  .withColumn("Pid",substring(rand(),3,4).cast("bigint"))


val df2 = spark.read
  .option("header",true)
  .csv("s3a://data/staging/received/data/spark/3/ForumTopics.csv")
  .withColumn("Cid",substring(rand(),3,4).cast("bigint"))

val df3 = df2.coalesce(1)
val joinDf = df1.join(df3, df1("Pid") === df3("Cid"))
val cnt = joinDf.count()
Run Code Online (Sandbox Code Playgroud)

DAG 看起来像这样:

在此输入图像描述

mik*_*ike 5

Spark应用广播连接,因为csv中25MB的数据(“读取的文件大小”)在被Spark序列化时将低于10MB(“数据大小”)。

“读取的文件大小”显示的数量非常准确,因为 Spark 能够直接计算数据文件的统计信息。然而,DAG 中显示的“数据大小”会受到 SizeEstimator 不准确的影响

那里说:

“估计给定对象在 JVM 堆上占用的字节数。估计值包括给定对象引用的对象占用的空间、它们的引用等等。

这对于确定广播变量在每个执行器上占用的堆空间量或以反序列化形式缓存对象时每个对象将占用的空间量非常有用。这与对象的序列化大小不同,后者通常要小得多。”

如果您想获取 25MB csv 文件的实际大小,您可以缓存它并检查 WebUI 中的“存储”选项卡。

在我的测试用例中,虽然我也将autoBroadcastJoinThreshold默认配置保留为 10MB,但 Spark 应用了广播连接。对于大小为 14MB 的 json 文件,估计为 66MB。当我缓存它时,它显示的大小为 3.5MB,明显低于 10MB 的阈值。

下图是我的测试用例(和你的类似):

在此输入图像描述

下面的截图显示了数据的实际大小,只有3.5MB:

在此输入图像描述

Microsoft在此提供了关于此的另一个参考资料。