dataframe spark scala 为每组取(MAX-MIN)

Sim*_*aPK -1 scala apache-spark apache-spark-sql

我有一个来自处理部分的数据框,看起来像:

   +---------+------+-----------+
|Time     |group |value      |
+---------+------+-----------+
|    28371|    94|        906|
|    28372|    94|        864|
|    28373|    94|        682|
|    28374|    94|        574|
|    28383|    95|        630|
|    28384|    95|        716|
|    28385|    95|        913|
Run Code Online (Sandbox Code Playgroud)

我想为每个组取(最大时间的值 - 最小时间的值),以获得以下结果:

+------+-----------+
|group |  value    |
+------+-----------+
|    94|       -332|
|    95|        283|
Run Code Online (Sandbox Code Playgroud)

预先感谢您的帮助

Vam*_*ala 5

df.groupBy("groupCol").agg(max("value")-min("value"))
Run Code Online (Sandbox Code Playgroud)

根据 OP 编辑​​的问题,这里有一种在 PySpark 中执行此操作的方法。这个想法是按每组时间的升序和降序计算行号,并使用这些值进行减法。

from pyspark.sql import Window
from pyspark.sql import functions as func
w_asc = Window.partitionBy(df.groupCol).orderBy(df.time)
w_desc = Window.partitionBy(df.groupCol).orderBy(func.desc(df.time))
df = df.withColumn(func.row_number().over(w_asc).alias('rnum_asc')) \
       .withColumn(func.row_number().over(w_desc).alias('rnum_desc'))
df.groupBy(df.groupCol) \
  .agg((func.max(func.when(df.rnum_desc==1,df.value))-func.max(func.when(df.rnum_asc==1,df.value))).alias('diff')).show()
Run Code Online (Sandbox Code Playgroud)

如果first_value在 Spark SQL 中提供窗口函数会更容易。使用 SQL 解决此问题的通用方法是

select distinct groupCol,diff
from (
select t.*
      ,first_value(val) over(partition by groupCol order by time) - 
       first_value(val) over(partition by groupCol order by time desc) as diff
from tbl t
) t 
Run Code Online (Sandbox Code Playgroud)

  • 回答完再改是不公平的。无论如何,提供了答案。 (3认同)