如何向Spark DataFrame添加持久的行ID列?

Kai*_*Kai 32 dataframe apache-spark apache-spark-sql

这个问题并不新鲜,但我在Spark中发现了令人惊讶的行为.我需要向DataFrame添加一列行ID.我使用了DataFrame方法monotonically_increasing_id(),它确实给了我一个额外的uniques行ID(顺便说一句,它们不是连续的,但是是唯一的).

我遇到的问题是,当我过滤DataFrame时,重新分配生成的DataFrame中的行ID.两个DataFrame如下所示.

  • 第一个是添加了行ID的初始DataFrame,如下所示:

    df.withColumn("rowId", monotonically_increasing_id()) 
    
    Run Code Online (Sandbox Code Playgroud)
  • 第二个DataFrame是在col P via上过滤后获得的数据帧df.filter(col("P")).

问题由custId 169的rowId说明,在初始DataFrame中为5,但在过滤后,当custId 169被过滤掉时,rowId(5)被重新分配给custmId 773!我不知道为什么这是默认行为.

我希望rowIds它"粘"; 如果我从DataFrame中删除行,我不希望他们的ID"重新使用",我希望它们与行一起消失.有可能吗?我没有看到任何标志从monotonically_increasing_id方法请求此行为.

+---------+--------------------+-------+
| custId  |    features|    P  |rowId|
+---------+--------------------+-------+
|806      |[50,5074,...|   true|    0|
|832      |[45,120,1...|   true|    1|
|216      |[6691,272...|   true|    2|
|926      |[120,1788...|   true|    3|
|875      |[54,120,1...|   true|    4|
|169      |[19406,21...|  false|    5|

after filtering on P:
+---------+--------------------+-------+
|   custId|    features|    P  |rowId|
+---------+--------------------+-------+
|      806|[50,5074,...|   true|    0|
|      832|[45,120,1...|   true|    1|
|      216|[6691,272...|   true|    2|
|      926|[120,1788...|   true|    3|
|      875|[54,120,1...|   true|    4|
|      773|[3136,317...|   true|    5|
Run Code Online (Sandbox Code Playgroud)

zer*_*323 19

Spark 2.0

  • 这个问题已在使用SPARK-14241的 Spark 2.0中得到解决.

  • 使用SPARK-14393在Spark 2.1中解决了另一个类似的问题

Spark 1.x

你遇到的问题是相当微妙的,但可以简化为一个简单的事实monotonically_increasing_id是一个非常难看的功能.它显然不是纯粹的,它的价值取决于完全无法控制的东西.

它不需要任何参数,因此从优化器的角度来看,调用它并不重要,并且可以在所有其他操作之后推送.因此你看到的行为.

如果你看一下你会发现的代码,就会通过扩展MonotonicallyIncreasingID表达式来明确标记Nondeterministic.

我不认为有任何优雅的解决方案,但你可以处理的一种方法是在过滤值上添加一个人工依赖.例如,使用这样的UDF:

from pyspark.sql.types import LongType
from pyspark.sql.functions import udf

bound = udf(lambda _, v: v, LongType()) 

(df
  .withColumn("rn", monotonically_increasing_id())
  # Due to nondeterministic behavior it has to be a separate step
  .withColumn("rn", bound("P", "rn"))  
  .where("P"))
Run Code Online (Sandbox Code Playgroud)

通常,zipWithIndex在a 上添加索引RDD然后将其转换回a 可能更清晰DataFrame.


*上面显示的解决方法不再是Spark 2.x中的有效解决方案(也不是必需的),其中Python UDF是执行计划优化的主题.