Bin*_*Bin 5 python hadoop hive apache-spark pyspark
当对已经在同一节点上的数据使用Hive窗口函数时,是否会发生数据混洗?
具体在下面的示例中,在使用窗口函数数据之前,已经使用Spark repartition()函数通过'City'重新分区,这应该确保城市'A'的所有数据共同本地化在同一节点上(假设城市的数据可以适合在一个节点).
df = sqlContext.createDataFrame(
[('A', '1', 2009, "data1"),
('A', '1', 2015, "data2"),
('A', '22', 2015, "data3"),
('A', '22', 2016, "data4"),
('BB', '333', 2014, "data5"),
('BB', '333', 2012, "data6"),
('BB', '333', 2016, "data7")
],
("City", "Person","year", "data"))
df = df.repartition(2, 'City')
df.show()
# +----+------+----+-----+
# |City|Person|year| data|
# +----+------+----+-----+
# | BB| 333|2012|data6|
# | BB| 333|2014|data5|
# | BB| 333|2016|data7|
# | A| 22|2016|data4|
# | A| 22|2015|data3|
# | A| 1|2009|data1|
# | A| 1|2015|data2|
# +----+------+----+-----+
Run Code Online (Sandbox Code Playgroud)
然后我必须通过'Person'进行窗口函数分区,它不是Spark repartition()中的分区键,如下所示.
df.registerTempTable('example')
sqlStr = """\
select *,
row_number() over (partition by Person order by year desc) ranking
from example
"""
sqlContext.sql(sqlStr).show(100)
# +----+------+----+-----+-------+
# |City|Person|year| data|ranking|
# +----+------+----+-----+-------+
# | BB| 333|2016|data7| 1|
# | BB| 333|2014|data5| 2|
# | BB| 333|2012|data6| 3|
# | A| 1|2015|data2| 1|
# | A| 1|2009|data1| 2|
# | A| 22|2016|data4| 1|
# | A| 22|2015|data3| 2|
# +----+------+----+-----+-------+
Run Code Online (Sandbox Code Playgroud)
这是我的问题:
Spark"repartition"和Hive"partition by"之间是否有任何关系或区别?在引擎盖下,他们在Spark上翻译成同样的东西吗?
我想检查一下我的理解是否正确.即使所有数据已经在同一节点上,如果我调用Spark df.repartition('A_key_different_from_current_partidion_key'),数据也会被混洗到许多节点,而不是在同一节点上保持在一起.
顺便说一句,我也很好奇用Spark窗口函数实现示例Hive查询是否简单.
partition by窗口函数中的两个子句repartition都执行相同的TungstenExchange机制。当您分析执行计划时,您会看到这一点:
sqlContext.sql(sqlStr).explain()
## == Physical Plan ==
## Window [City#0,Person#1,year#2L,data#3], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber() windowspecdefinition(Person#1,year#2L DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS ranking#5], [Person#1], [year#2L DESC]
## +- Sort [Person#1 ASC,year#2L DESC], false, 0
## +- TungstenExchange hashpartitioning(Person#1,200), None <- PARTITION BY
## +- Project [City#0,Person#1,year#2L,data#3]
## +- TungstenExchange hashpartitioning(City#0,2), None <- REPARTITION
## +- ConvertToUnsafe
## +- Scan ExistingRDD[City#0,Person#1,year#2L,data#3]
Run Code Online (Sandbox Code Playgroud)
关于第二个问题,你的假设是正确的。即使数据已经位于单个节点上,Spark 也没有关于数据分布的先验知识,并且会再次对数据进行 shuffle。
最后,根据观点,您的查询已经是 Spark 查询,或者无法使用普通 Spark 执行此查询。
这是一个 Spark 查询,因为对应的 DSL 将使用完全相同的机制
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
w = Window.partitionBy("person").orderBy(col("year").desc())
df.withColumn("ranking", row_number().over(w))
Run Code Online (Sandbox Code Playgroud)使用普通 Spark 无法执行此操作,因为从 Spark 1.6 开始,没有窗口函数的本机实现。它在 Spark 2.0 中发生了变化。
| 归档时间: |
|
| 查看次数: |
1171 次 |
| 最近记录: |