如何根据条件(组中的值)更新列?

sen*_*r p 13 scala apache-spark apache-spark-sql

我有以下df:

+---+----+-----+
|sno|dept|color|
+---+----+-----+
|  1|  fn|  red|
|  2|  fn| blue|
|  3|  fn|green|
+---+----+-----+
Run Code Online (Sandbox Code Playgroud)

如果任何颜色列值是red,那么我应该更新颜色列的所有值red,如下所示:

+---+----+-----+
|sno|dept|color|
+---+----+-----+
|  1|  fn|  red|
|  2|  fn|  red|
|  3|  fn|  red|
+---+----+-----+
Run Code Online (Sandbox Code Playgroud)

我无法理解.请帮忙; 我试过以下代码:

val gp=jdbcDF.filter($"dept".contains("fn"))
     //.withColumn("newone",when($"dept"==="fn","RED").otherwise("NULL"))
    gp.show()
gp.map(
  row=>{
    val row1=row.getAs[String](1)
    var row2=row.getAs[String](2)
    val make=if(row1 =="fn") row2="red"
    Row(row(0),row(1),make)
  }
).collect().foreach(println)
Run Code Online (Sandbox Code Playgroud)

Jac*_*ski 11

鉴于:

val df = Seq(
  (1, "fn", "red"),
  (2, "fn", "blue"),
  (3, "fn", "green"),
  (4, "aa", "blue"),
  (5, "aa", "green"),
  (6, "bb", "red"),
  (7, "bb", "red"),
  (8, "aa", "blue")
).toDF("id", "fn", "color")
Run Code Online (Sandbox Code Playgroud)

做计算:

val redOrNot = df.groupBy("fn")
  .agg(collect_set('color) as "values")
  .withColumn("hasRed", array_contains('values, "red"))

// gives null for no option
val colorPicker = when('hasRed, "red")
val result = df.join(redOrNot, "fn")
  .withColumn("resultColor", colorPicker) 
  .withColumn("color", coalesce('resultColor, 'color)) // skips nulls that leads to the answer
  .select('id, 'fn, 'color)
Run Code Online (Sandbox Code Playgroud)

result如下的外观(这似乎是一个答案):

scala> result.show
+---+---+-----+
| id| fn|color|
+---+---+-----+
|  1| fn|  red|
|  2| fn|  red|
|  3| fn|  red|
|  4| aa| blue|
|  5| aa|green|
|  6| bb|  red|
|  7| bb|  red|
|  8| aa| blue|
+---+---+-----+
Run Code Online (Sandbox Code Playgroud)

您可以链接when运算符并使用默认值otherwise.咨询运营商scaladocwhen.

我认为你可以使用窗口操作符或用户定义的聚合函数(UDAF)做一些非常相似(也许更有效)的东西,但是......好吧......目前还不知道怎么做.留下评论来激励他人;-)

ps了解了很多!谢谢你的想法!

  • @JacekLaskowski:窗口函数怎么样?虽然你的答案是最好的,但我认为用Window函数也可以实现它不是吗? (3认同)

e9f*_*079 9

高效的解决方案,不需要昂贵的分组:

// All groups with `red`
df.where($"color" === "red").select($"fn".alias("fn_")).distinct
  // Join with input
  .join(df.as("df"), $"fn" === $"fn_", "rightouter")
  // Replace `color`
  .withColumn("color", when($"fn_"isNull, $"color").otherwise(lit("red")))
  .drop("fn_")
Run Code Online (Sandbox Code Playgroud)


Dan*_*lds 5

如果 DataFrame 满足某个属性,则有条件地更新它。在这种情况下,属性是“颜色列包含‘红色’”。表达这一点的惯用方法是使用所需的谓词进行过滤,然后确定是否有任何行满足它。不需要加入。

import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.DataFrame

def makeAllRedIfAnyAreRed(df: DataFrame) = {
    val containsRed = df.filter(df("color") === "red").count() > 0
    if (containsRed) df.withColumn("color", lit("red")) else df
}
Run Code Online (Sandbox Code Playgroud)