Jav*_*ia 1 scala window apache-spark partition-by
您好,我正在尝试将每个窗口的最后一个值扩展到该列的窗口的其余部分count,以便创建一个标志来识别寄存器是否是窗口的最后一个值。我尝试了这种方法,但没有成功。
样本 DF:
\n\nval df_197 = Seq [(Int, Int, Int, Int)]((1,1,7,10),(1,10,4,300),(1,3,14,50),(1,20,24,70),(1,30,12,90),(2,10,4,900),(2,25,30,40),(2,15,21,60),(2,5,10,80)).toDF("policyId","FECMVTO","aux","IND_DEF").orderBy(asc("policyId"), asc("FECMVTO"))\ndf_197.show\nRun Code Online (Sandbox Code Playgroud)\n\n+--------+-------+---+-------+\n|policyId|FECMVTO|aux|IND_DEF|\n+--------+-------+---+-------+\n| 1| 1| 7| 10|\n| 1| 3| 14| 50|\n| 1| 10| 4| 300|\n| 1| 20| 24| 70|\n| 1| 30| 12| 90|\n| 2| 5| 10| 80|\n| 2| 10| 4| 900|\n| 2| 15| 21| 60|\n| 2| 25| 30| 40|\n+--------+-------+---+-------+\nRun Code Online (Sandbox Code Playgroud)\n\nval juntar_riesgo = 1\nval var_entidad_2 = $"aux"\n\n//Particionar por uno o dos campos en funcion del valor de la variable juntar_riesgo\n//Se crear\xc3\xa1 window_number_2 basado en este particionamiento\nval winSpec = if(juntar_riesgo == 1) {\n Window.partitionBy($"policyId").orderBy($"FECMVTO") \n} else {\n Window.partitionBy(var_entidad_2,$"policyId").orderBy("FECMVTO")\n}\n\nval df_308 = df_307.withColumn("window_number", row_number().over(winSpec))\n .withColumn("count", last("window_number",true) over (winSpec))\n .withColumn("FLG_LAST_WDW", when(col("window_number") === col("count"),1).otherwise(lit(0))).show\nRun Code Online (Sandbox Code Playgroud)\n\n结果(对于第一个分区的所有元素,列数需要为 5,对于第二个分区的所有元素,列数需要为 4):
\n\n+--------+-------+---+-------+-------------+-----+------------+\n|policyId|FECMVTO|aux|IND_DEF|window_number|count|FLG_LAST_WDW|\n+--------+-------+---+-------+-------------+-----+------------+\n| 1| 1| 7| 10| 1| 1| 1|\n| 1| 3| 14| 50| 2| 2| 1|\n| 1| 10| 4| 300| 3| 3| 1|\n| 1| 20| 24| 70| 4| 4| 1|\n| 1| 30| 12| 90| 5| 5| 1|\n| 2| 5| 10| 80| 1| 1| 1|\n| 2| 10| 4| 900| 2| 2| 1|\n| 2| 15| 21| 60| 3| 3| 1|\n| 2| 25| 30| 40| 4| 4| 1|\n+--------+-------+---+-------+-------------+-----+------------+\nRun Code Online (Sandbox Code Playgroud)\n\n然后我读到当您使用orderByafterwindowPartition子句时,您必须指定该子句.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)才能实现我需要的。但是,当我尝试时,我遇到了这个错误:
val juntar_riesgo = 1\nval var_entidad_2 = $"aux"\n\n//Particionar por uno o dos campos en funcion del valor de la variable juntar_riesgo\n//Se crear\xc3\xa1 window_number_2 basado en este particionamiento\nval winSpec = if(juntar_riesgo == 1) {\n Window.partitionBy($"policyId").orderBy($"FECMVTO") \n .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)\n} else {\n Window.partitionBy(var_entidad_2,$"policyId").orderBy("FECMVTO")\n .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)\n}\n\nval df_198 = df_197.withColumn("window_number", row_number().over(winSpec))\n .withColumn("count", last("window_number",true) over (winSpec))\n .withColumn("FLG_LAST_WDW", when(col("window_number") === col("count"),1).otherwise(lit(0))).show\nRun Code Online (Sandbox Code Playgroud)\n\nERROR: org.apache.spark.sql.AnalysisException: Window Frame specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()) must match the required frame specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$());\nRun Code Online (Sandbox Code Playgroud)\n\n感谢您的帮助!
\n您不应该last在此处使用但未max 指定顺序:
val df_198 = df_197
.withColumn("window_number", row_number().over(Window.partitionBy($"policyId").orderBy($"FECMVTO")))
.withColumn("count", max("window_number") over (Window.partitionBy($"policyId")))
.withColumn("FLG_LAST_WDW", when(col("window_number") === col("count"),1).otherwise(lit(0))).show
+--------+-------+---+-------+-------------+-----+------------+
|policyId|FECMVTO|aux|IND_DEF|window_number|count|FLG_LAST_WDW|
+--------+-------+---+-------+-------------+-----+------------+
| 1| 1| 7| 10| 1| 5| 0|
| 1| 3| 14| 50| 2| 5| 0|
| 1| 10| 4| 300| 3| 5| 0|
| 1| 20| 24| 70| 4| 5| 0|
| 1| 30| 12| 90| 5| 5| 1|
| 2| 5| 10| 80| 1| 4| 0|
| 2| 10| 4| 900| 2| 4| 0|
| 2| 15| 21| 60| 3| 4| 0|
| 2| 25| 30| 40| 4| 4| 1|
+--------+-------+---+-------+-------------+-----+------------+
Run Code Online (Sandbox Code Playgroud)
row_number请注意,您可以通过降序计算来写得更短,然后采用row_number===1:
val df_198 = df_197
.withColumn("FLG_LAT_WDW", when(row_number().over(Window.partitionBy($"policyId").orderBy($"FECMVTO".desc))===1,1).otherwise(0))
.show
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5601 次 |
| 最近记录: |