了解火花物理计划

cod*_*din 20 sql catalyst query-optimization apache-spark apache-spark-sql

我试图理解火花的物理计划,但我不理解某些部分,因为它们看起来与传统的rdbms不同.例如,在下面的这个计划中,它是关于对hive表的查询的计划.查询是这样的:

select
        l_returnflag,
        l_linestatus,
        sum(l_quantity) as sum_qty,
        sum(l_extendedprice) as sum_base_price,
        sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
        sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
        avg(l_quantity) as avg_qty,
        avg(l_extendedprice) as avg_price,
        avg(l_discount) as avg_disc,
        count(*) as count_order
    from
        lineitem
    where
        l_shipdate <= '1998-09-16'
    group by
        l_returnflag,
        l_linestatus
    order by
        l_returnflag,
        l_linestatus;


== Physical Plan ==
Sort [l_returnflag#35 ASC,l_linestatus#36 ASC], true, 0
+- ConvertToUnsafe
   +- Exchange rangepartitioning(l_returnflag#35 ASC,l_linestatus#36 ASC,200), None
      +- ConvertToSafe
         +- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Final,isDistinct=false),(sum(l_extendedpr#32),mode=Final,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Final,isDistinct=false),(sum(((l_extendedprice#32 * (1.0l_discount#33)) * (1.0 + l_tax#34))),mode=Final,isDistinct=false),(avg(l_quantity#31),mode=Final,isDistinct=false),(avg(l_extendedprice#32),mode=Fl,isDistinct=false),(avg(l_discount#33),mode=Final,isDistinct=false),(count(1),mode=Final,isDistinct=false)], output=[l_returnflag#35,l_linestatus,sum_qty#0,sum_base_price#1,sum_disc_price#2,sum_charge#3,avg_qty#4,avg_price#5,avg_disc#6,count_order#7L])
            +- TungstenExchange hashpartitioning(l_returnflag#35,l_linestatus#36,200), None
               +- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Partial,isDistinct=false),(sum(l_exdedprice#32),mode=Partial,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Partial,isDistinct=false),(sum(((l_extendedpri32 * (1.0 - l_discount#33)) * (1.0 + l_tax#34))),mode=Partial,isDistinct=false),(avg(l_quantity#31),mode=Partial,isDistinct=false),(avg(l_extendedce#32),mode=Partial,isDistinct=false),(avg(l_discount#33),mode=Partial,isDistinct=false),(count(1),mode=Partial,isDistinct=false)], output=[l_retulag#35,l_linestatus#36,sum#64,sum#65,sum#66,sum#67,sum#68,count#69L,sum#70,count#71L,sum#72,count#73L,count#74L])
                  +- Project [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_returnflag#35]
                     +- Filter (l_shipdate#37 <= 1998-09-16)
                        +- HiveTableScan [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_shipdate#37,l_returnflag#35], astoreRelation default, lineitem, None
Run Code Online (Sandbox Code Playgroud)

我在计划中理解的是:

  1. 首先从Hive表扫描开始

  2. 然后它使用条件过滤器

  3. 然后投影以获得我们想要的列

  4. 那么TungstenAggregate?

  5. 那么TungstenExchange?

  6. 那么TungstenAggregate又来了吗?

  7. 那么ConvertToSafe?

  8. 然后对最终结果进行排序

但我不理解4,5,6和7步骤.你知道它们是什么吗?我正在寻找有关这方面的信息,所以我可以理解这个计划,但我找不到具体的东西.

zer*_*323 25

让我们看一下您使用的SQL查询的结构:

SELECT
    ...  -- not aggregated columns  #1
    ...  -- aggregated columns      #2
FROM
    ...                          -- #3
WHERE
    ...                          -- #4
GROUP BY
    ...                          -- #5
ORDER BY
    ...                          -- #6
Run Code Online (Sandbox Code Playgroud)

正如你已经怀疑的那样:

  • Filter (...)对应于WHERE子句(#4)中的谓词
  • Project ...将列数限制为并集所需的列数(#1#2,和#4/,#6如果不存在SELECT)
  • HiveTableScan对应于FROM子句(#3)

剩余部分可归结如下:

  • #2from from SELECT- functionsfield inTungstenAggregates
  • GROUP BY子句(#5):

    • TungstenExchange/hash分区
    • keyTungstenAggregates
  • #6- ORDER BY条款.

Tungsten项目通常描述Spark DataFrames( - sets)使用的一组优化,包括:

  • 显式内存管理sun.misc.Unsafe.它意味着"本机"(堆外)内存使用和显式内存分配/释放GC管理之外.这些转换对应于执行计划中的ConvertToUnsafe/ ConvertToSafe步骤.您可以从了解sun.misc.Unsafe中了解一些有关不安全的有趣详细信息
  • 代码生成 - 用于生成在编译期间更好地优化的代码的不同元编程技巧.你可以把它想象成一个内部的Spark编译器,它可以将很好的功能代码重写成丑陋的for循环.

您可以从Project Tungsten了解更多关于Tungsten的信息:将Apache Spark Closer引入Bare Metal.Apache Spark 2.0:更快,更简单,更智能提供了一些代码生成示例.

TungstenAggregate发生两次,因为数据首先在每个分区上本地聚合,而不是混洗,最后合并.如果您熟悉RDD API,则此过程大致相当于reduceByKey.

如果执行计划不明确,您还可以尝试将结果转换DataFrameRDD并分析输出toDebugString.