根据条件拆分Spark DataFrame

Mar*_*ace 5 scala dataframe apache-spark apache-spark-sql

我需要类似于randomSplit函数的东西:

val Array(df1, df2) = myDataFrame.randomSplit(Array(0.6, 0.4))
Run Code Online (Sandbox Code Playgroud)

但是,我需要根据布尔条件拆分myDataFrame.是否存在以下任何内容?

val Array(df1, df2) = myDataFrame.booleanSplit(col("myColumn") > 100)
Run Code Online (Sandbox Code Playgroud)

我不想做两个单独的.filter调用.

Tza*_*har 6

不幸的是,DataFrame API没有这样的方法,要根据条件进行拆分,您必须执行两个单独的filter转换:

myDataFrame.cache() // recommended to prevent repeating the calculation

val condition = col("myColumn") > 100
val df1 = myDataFrame.filter(condition)
val df2 = myDataFrame.filter(not(condition))
Run Code Online (Sandbox Code Playgroud)

  • 注意:如果此特定示例中的“myColumn”为“NULL”,则不会导致正确的拆分。您将丢失具有 NULL 的列,因为该列在 (> 100) 和 (<= 100) (否定谓词)上都不会产生 true。如果您在该列上检查“and isNotNull”,这将被正确否定(变为“or isNull”)并产生您正在寻找的正确拆分。 (3认同)

Ber*_*ler 5

我知道两次缓存和过滤看起来有点难看,但请记住,DataFrame 被转换为 RDD,它们被延迟评估,即仅当它们直接或间接在操作中使用时才进行评估。

如果存在问题中建议的方法booleanSplit,结果将被转换为两个 RDD,每个 RDD 都会被延迟评估。两个 RDD 中的一个将首先被评估,另一个将被其次评估,严格地在第一个之后。在评估第一个 RDD 时,第二个 RDD 尚未“存在”(编辑:刚刚注意到 RDD API 有一个类似的问题,其答案给出了类似的推理

为了真正获得任何性能优势,第二个 RDD 必须在第一个 RDD 的迭代期间(或者实际上,在两个 RDD 的父 RDD 的迭代期间(由第一个 RDD 的迭代触发)被(部分)持久化) )。在我看来,这与 RDD API 其余部分的设计不太相符。不确定性能提升是否证明这一点是合理的。

我认为您能实现的最好方法是避免直接在业务代码中编写两个过滤器调用,通过编写一个带有方法的隐式类作为实用程序方法,以与Tzach Zohar 的答案booleanSplit类似的方式执行该部分,也许使用类似的东西因此条件的值不会计算两次。myDataFrame.withColumn("__condition_value", condition).cache()