SparkSQL 查询计划中的 HashAggregate

mar*_*e20 4 sql-execution-plan apache-spark-sql

我只是想了解 SparkSQL (2.4) 中生成的查询计划。我有以下查询及其相应的查询计划(如下)。(该查询只是一个测试查询)。

create temporary view tab_in as
select distinct 
       mth_id 
from tgt_tbl;

select /*+ BROADCAST(c) */
a.mth_id,
a.qtr_id,
a.prod_id,
a.sale_date
from my_table a
left anti join tab_in c
on a.mth_id = c.mth_id;
Run Code Online (Sandbox Code Playgroud)

解释一下计划:

+- *(3) Project [mth_id#652, qtr_id#653, prod_id#655, sale_dt#656]
   +- *(3) BroadcastHashJoin [mth_id#652], [mth_id#867], LeftAnti, BuildRight
      :- *(3) Project [mth_id#652, qtr_id#653, sale_dt#656, prod_id#655]
      :  +- *(3) Filescan parquet test_db.my_table[mth_id#652, qtr_id#653, prod_id#655, sale_dt#656] Batched: true, Format: Parquet, Location: CatalogFileIndex[s3://test-data/my_table/0], PartitionCount: 1, PartitionFilters: [], PushedFilters: [], ReadSchema ......
      +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
         +- *(2) HashAggregate(keys=[mth_id#867], functions=[], output=[mth_id#867]
            +- Exchange hashpartitioning(mth_id#867, 200)
               +- *(1) HashAggregate(keys=[mth_id#867], functions=[], output=[mth_id#867])
                  +- *(1) Project [mth_id#867]
                     +- *(1) Filter isnotnull(mth_id#867)
                        +- *(1) FileScan parquet test_db.my_table[mth_id#867] Batched:true, Format:  Parquet, Location: InMemoryFileIndex[s3://test-data/tgt_tbl/20200609], PartitionFilters: [], PushedFilters: [IsNotNull(mth_id)], ReadSchema struct<mth_id:int>
Run Code Online (Sandbox Code Playgroud)

从上面可以看出,HashAggregates计划中有 2 个正在执行 -1之前和1之后Exchange HashPartitioning。我认为第一个HashAggregate可能是由于DISTINCT第一个查询中存在该子句,但我似乎无法弄清楚第二个HashPartitioning(在交换之后)的原因。

我尝试通过将第一个查询放入子句中将两个查询合并为一个查询WITH CTE,但仍然得到相同的结果。

有人可以解释一下第二个的必要性吗(从下面阅读)HashAggregate

任何帮助表示赞赏。谢谢

Dav*_*rba 6

计划中的两者HashAggregates都是因为重复数据删除(distinct)。通常HashAggregate成对出现。这里第一个负责每个执行器上的本地重复数据删除。接下来Exchange- 数据必须被打乱,第二个HashAggregate负责打乱后的最终重复数据删除。