Rap*_*oth 5 scala apache-spark
我尝试使用基于窗口函数的逻辑表达式来检测重复记录:
df
.where(count("*").over(Window.partitionBy($"col1",$"col2"))>lit(1))
.show
Run Code Online (Sandbox Code Playgroud)
这给出了Spark 2.1.1:
java.lang.ClassCastException: org.apache.spark.sql.catalyst.plans.logical.Project cannot be cast to org.apache.spark.sql.catalyst.plans.logical.Aggregate
Run Code Online (Sandbox Code Playgroud)
另一方面,如果我将window函数的结果分配给新列,然后过滤该列,则它可以工作:
df
.withColumn("count", count("*").over(Window.partitionBy($"col1",$"col2"))
.where($"count">lit(1)).drop($"count")
.show
Run Code Online (Sandbox Code Playgroud)
我不知道如何在不使用临时列的情况下编写此代码?
我猜想窗口函数不能在过滤器内使用。您必须创建一个附加列并过滤这一列。
您可以做的是将窗口函数绘制到选择中。
df.select(col("1"), col("2"), lag(col("2"), 1).over(window).alias("2_lag"))).filter(col("2_lag")==col("2"))
Run Code Online (Sandbox Code Playgroud)
然后你就可以在一份声明中得到它。
| 归档时间: |
|
| 查看次数: |
799 次 |
| 最近记录: |