Joh*_*ohn 9 scala dataframe apache-spark apache-spark-sql
我有一个数据帧(spark):
id value
3 0
3 1
3 0
4 1
4 0
4 0
Run Code Online (Sandbox Code Playgroud)
我想创建一个新的数据帧:
3 0
3 1
4 1
Run Code Online (Sandbox Code Playgroud)
需要删除每个id后1(值)之后的所有行.我尝试使用spark dateframe(Scala)中的窗口函数.但无法找到解决方案.看起来我正朝着错误的方向前进.
我正在寻找Scala.Thanks的解决方案
使用monotonically_increasing_id输出
scala> val data = Seq((3,0),(3,1),(3,0),(4,1),(4,0),(4,0)).toDF("id", "value")
data: org.apache.spark.sql.DataFrame = [id: int, value: int]
scala> val minIdx = dataWithIndex.filter($"value" === 1).groupBy($"id").agg(min($"idx")).toDF("r_id", "min_idx")
minIdx: org.apache.spark.sql.DataFrame = [r_id: int, min_idx: bigint]
scala> dataWithIndex.join(minIdx,($"r_id" === $"id") && ($"idx" <= $"min_idx")).select($"id", $"value").show
+---+-----+
| id|value|
+---+-----+
| 3| 0|
| 3| 1|
| 4| 1|
+---+-----+
Run Code Online (Sandbox Code Playgroud)
如果我们在原始数据帧中进行了排序转换,那么解决方案就无法工作.那个时候monotonically_increasing_id()是基于原始DF生成的,而不是排序DF.I之前已经错过了那个要求.
欢迎所有建议.
一种方法是使用monotonically_increasing_id()和自联接:
val data = Seq((3,0),(3,1),(3,0),(4,1),(4,0),(4,0)).toDF("id", "value")
data.show
+---+-----+
| id|value|
+---+-----+
| 3| 0|
| 3| 1|
| 3| 0|
| 4| 1|
| 4| 0|
| 4| 0|
+---+-----+
Run Code Online (Sandbox Code Playgroud)
现在我们生成一个以idx增加的名称命名的列Long:
val dataWithIndex = data.withColumn("idx", monotonically_increasing_id())
// dataWithIndex.cache()
Run Code Online (Sandbox Code Playgroud)
现在我们得到min(idx)每个id地方value = 1:
val minIdx = dataWithIndex
.filter($"value" === 1)
.groupBy($"id")
.agg(min($"idx"))
.toDF("r_id", "min_idx")
Run Code Online (Sandbox Code Playgroud)
现在我们加入min(idx)到原来的DataFrame:
dataWithIndex.join(
minIdx,
($"r_id" === $"id") && ($"idx" <= $"min_idx")
).select($"id", $"value").show
+---+-----+
| id|value|
+---+-----+
| 3| 0|
| 3| 1|
| 4| 1|
+---+-----+
Run Code Online (Sandbox Code Playgroud)
注意: monotonically_increasing_id()根据行的分区生成其值.每次dataWithIndex重新评估时,此值可能会更改.在上面的代码中,由于懒惰的评估,只有当我调用被评估的final时才会show这样monotonically_increasing_id().
如果您想强制该值保持不变,例如,您可以使用它show来逐步评估上述内容,请取消注释以上这一行:
// dataWithIndex.cache()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
46998 次 |
| 最近记录: |