Apache Spark:重新分区、排序和缓存对连接的影响

Daw*_*wid 10 bigdata apache-spark pyspark azure-databricks delta-lake

我正在探索将表加入到自身时 Spark 的行为。我正在使用数据块。

我的虚拟场景是:

  1. 将外部表读取为数据帧 A(底层文件采用 delta 格式)

  2. 将数据框 B 定义为仅选择某些列的数据框 A

  3. 在 column1 和 column2 上连接数据框 A 和 B

(是的,这没有多大意义,我只是在尝试了解 Spark 的底层机制)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))

b = a.select("column1", "column2", "columnA")

c= a.join(b, how="left", on = ["column1", "column2"])
Run Code Online (Sandbox Code Playgroud)

我的第一次尝试是按原样运行代码(尝试 1)。然后我尝试重新分区和缓存(尝试 2)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()
Run Code Online (Sandbox Code Playgroud)

最后,我重新分区、排序和缓存

 a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()
Run Code Online (Sandbox Code Playgroud)

生成的相应 dag 如下所示。

我的问题是:

  1. 为什么在尝试 1 中,即使没有明确指定缓存,表似乎也被缓存了。

  2. 为什么 InMemoreTableScan 后面总是跟有这种类型的另一个节点。

  3. 为什么在尝试 3 中缓存似乎发生在两个阶段?

  4. 为什么在尝试 3 WholeStageCodegen 之后跟随一个(并且只有一个)InMemoreTableScan。

尝试 1

尝试 2

在此处输入图片说明

Ash*_*ngh 5

您在这 3 个计划中观察到的是 DataBricks 运行时和 Spark 的混合。

首先,在运行 DataBricks 运行时 3.3+ 时,会自动为所有镶木地板文件启用缓存。相应的配置: spark.databricks.io.cache.enabled true

对于您的第二个查询,InMemoryTableScan发生了两次,因为在调用 join 时,spark 尝试并行计算数据集 A 和数据集 B。假设不同的执行者被分配了上述任务,两者都必须从(DataBricks)缓存中扫描表。

对于第三个,InMemoryTableScan本身并不是指缓存。这只是意味着无论形成何种计划催化剂都涉及多次扫描缓存表。

PS:我无法想象第 4 点:)