Kai*_*Kai 32 dataframe apache-spark apache-spark-sql
这个问题并不新鲜,但我在Spark中发现了令人惊讶的行为.我需要向DataFrame添加一列行ID.我使用了DataFrame方法monotonically_increasing_id(),它确实给了我一个额外的uniques行ID(顺便说一句,它们不是连续的,但是是唯一的).
我遇到的问题是,当我过滤DataFrame时,重新分配生成的DataFrame中的行ID.两个DataFrame如下所示.
第一个是添加了行ID的初始DataFrame,如下所示:
df.withColumn("rowId", monotonically_increasing_id())
Run Code Online (Sandbox Code Playgroud)第二个DataFrame是在col P via上过滤后获得的数据帧df.filter(col("P"))
.
问题由custId 169的rowId说明,在初始DataFrame中为5,但在过滤后,当custId 169被过滤掉时,rowId(5)被重新分配给custmId 773!我不知道为什么这是默认行为.
我希望rowIds
它"粘"; 如果我从DataFrame中删除行,我不希望他们的ID"重新使用",我希望它们与行一起消失.有可能吗?我没有看到任何标志从monotonically_increasing_id
方法请求此行为.
+---------+--------------------+-------+
| custId | features| P |rowId|
+---------+--------------------+-------+
|806 |[50,5074,...| true| 0|
|832 |[45,120,1...| true| 1|
|216 |[6691,272...| true| 2|
|926 |[120,1788...| true| 3|
|875 |[54,120,1...| true| 4|
|169 |[19406,21...| false| 5|
after filtering on P:
+---------+--------------------+-------+
| custId| features| P |rowId|
+---------+--------------------+-------+
| 806|[50,5074,...| true| 0|
| 832|[45,120,1...| true| 1|
| 216|[6691,272...| true| 2|
| 926|[120,1788...| true| 3|
| 875|[54,120,1...| true| 4|
| 773|[3136,317...| true| 5|
Run Code Online (Sandbox Code Playgroud)
zer*_*323 19
Spark 2.0
这个问题已在使用SPARK-14241的 Spark 2.0中得到解决.
使用SPARK-14393在Spark 2.1中解决了另一个类似的问题
Spark 1.x
你遇到的问题是相当微妙的,但可以简化为一个简单的事实monotonically_increasing_id
是一个非常难看的功能.它显然不是纯粹的,它的价值取决于完全无法控制的东西.
它不需要任何参数,因此从优化器的角度来看,调用它并不重要,并且可以在所有其他操作之后推送.因此你看到的行为.
如果你看一下你会发现的代码,就会通过扩展MonotonicallyIncreasingID
表达式来明确标记Nondeterministic
.
我不认为有任何优雅的解决方案,但你可以处理的一种方法是在过滤值上添加一个人工依赖.例如,使用这样的UDF:
from pyspark.sql.types import LongType
from pyspark.sql.functions import udf
bound = udf(lambda _, v: v, LongType())
(df
.withColumn("rn", monotonically_increasing_id())
# Due to nondeterministic behavior it has to be a separate step
.withColumn("rn", bound("P", "rn"))
.where("P"))
Run Code Online (Sandbox Code Playgroud)
通常,zipWithIndex
在a 上添加索引RDD
然后将其转换回a 可能更清晰DataFrame
.
*上面显示的解决方法不再是Spark 2.x中的有效解决方案(也不是必需的),其中Python UDF是执行计划优化的主题.
归档时间: |
|
查看次数: |
30903 次 |
最近记录: |