Hive和Spark窗口函数的数据随机播放

Bin*_*Bin 5 python hadoop hive apache-spark pyspark

当对已经在同一节点上的数据使用Hive窗口函数时,是否会发生数据混洗?

具体在下面的示例中,在使用窗口函数数据之前,已经使用Spark repartition()函数通过'City'重新分区,这应该确保城市'A'的所有数据共同本地化在同一节点上(假设城市的数据可以适合在一个节点).

df = sqlContext.createDataFrame(
    [('A', '1', 2009, "data1"),
     ('A', '1', 2015, "data2"),
     ('A', '22', 2015, "data3"),
     ('A', '22', 2016, "data4"),
     ('BB', '333', 2014, "data5"), 
     ('BB', '333', 2012, "data6"), 
     ('BB', '333', 2016, "data7")
    ],
    ("City", "Person","year", "data"))
df = df.repartition(2, 'City')
df.show()
# +----+------+----+-----+
# |City|Person|year| data|
# +----+------+----+-----+
# |  BB|   333|2012|data6|
# |  BB|   333|2014|data5|
# |  BB|   333|2016|data7|
# |   A|    22|2016|data4|
# |   A|    22|2015|data3|
# |   A|     1|2009|data1|
# |   A|     1|2015|data2|
# +----+------+----+-----+
Run Code Online (Sandbox Code Playgroud)

然后我必须通过'Person'进行窗口函数分区,它不是Spark repartition()中的分区键,如下所示.

df.registerTempTable('example')
sqlStr = """\
    select *,
        row_number() over (partition by Person order by year desc) ranking
    from example
"""
sqlContext.sql(sqlStr).show(100)

# +----+------+----+-----+-------+
# |City|Person|year| data|ranking|
# +----+------+----+-----+-------+
# |  BB|   333|2016|data7|      1|
# |  BB|   333|2014|data5|      2|
# |  BB|   333|2012|data6|      3|
# |   A|     1|2015|data2|      1|
# |   A|     1|2009|data1|      2|
# |   A|    22|2016|data4|      1|
# |   A|    22|2015|data3|      2|
# +----+------+----+-----+-------+
Run Code Online (Sandbox Code Playgroud)

这是我的问题:

  1. Spark"repartition"和Hive"partition by"之间是否有任何关系或区别?在引擎盖下,他们在Spark上翻译成同样的东西吗?

  2. 我想检查一下我的理解是否正确.即使所有数据已经​​在同一节点上,如果我调用Spark df.repartition('A_key_different_from_current_partidion_key'),数据也会被混洗到许多节点,而不是在同一节点上保持在一起.

顺便说一句,我也很好奇用Spark窗口函数实现示例Hive查询是否简单.

zer*_*323 5

partition by窗口函数中的两个子句repartition都执行相同的TungstenExchange机制。当您分析执行计划时,您会看到这一点:

sqlContext.sql(sqlStr).explain()

## == Physical Plan ==
## Window [City#0,Person#1,year#2L,data#3], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber() windowspecdefinition(Person#1,year#2L DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS ranking#5], [Person#1], [year#2L DESC]
## +- Sort [Person#1 ASC,year#2L DESC], false, 0
##    +- TungstenExchange hashpartitioning(Person#1,200), None <- PARTITION BY
##       +- Project [City#0,Person#1,year#2L,data#3]
##          +- TungstenExchange hashpartitioning(City#0,2), None <- REPARTITION
##             +- ConvertToUnsafe
##                +- Scan ExistingRDD[City#0,Person#1,year#2L,data#3]
Run Code Online (Sandbox Code Playgroud)

关于第二个问题,你的假设是正确的。即使数据已经位于单个节点上,Spark 也没有关于数据分布的先验知识,并且会再次对数据进行 shuffle。

最后,根据观点,您的查询已经是 Spark 查询,或者无法使用普通 Spark 执行此查询。

  • 这是一个 Spark 查询,因为对应的 DSL 将使用完全相同的机制

    from pyspark.sql.window import Window
    from pyspark.sql.functions import col, row_number
    
    w = Window.partitionBy("person").orderBy(col("year").desc())
    df.withColumn("ranking", row_number().over(w))
    
    Run Code Online (Sandbox Code Playgroud)
  • 使用普通 Spark 无法执行此操作,因为从 Spark 1.6 开始,没有窗口函数的本机实现。它在 Spark 2.0 中发生了变化。