如何使用pyspark从某些字段分组的给定数据集中获取max(date)?

coo*_*716 17 sql apache-spark apache-spark-sql pyspark pyspark-sql

我有数据框中的数据如下:

  datetime             | userId | memberId | value |    
2016-04-06 16:36:...   | 1234   | 111      | 1
2016-04-06 17:35:...   | 1234   | 222      | 5
2016-04-06 17:50:...   | 1234   | 111      | 8
2016-04-06 18:36:...   | 1234   | 222      | 9
2016-04-05 16:36:...   | 4567   | 111      | 1
2016-04-06 17:35:...   | 4567   | 222      | 5
2016-04-06 18:50:...   | 4567   | 111      | 8
2016-04-06 19:36:...   | 4567   | 222      | 9
Run Code Online (Sandbox Code Playgroud)

我需要在userid,memberid中找到max(datetime)groupby.当我尝试如下:

df2 = df.groupBy('userId','memberId').max('datetime')
Run Code Online (Sandbox Code Playgroud)

我收到的错误是:

org.apache.spark.sql.AnalysisException: "datetime" is not a numeric
column. Aggregation function can only be applied on a numeric column.;
Run Code Online (Sandbox Code Playgroud)

我想要的输出如下:

userId | memberId | datetime
1234   |  111     | 2016-04-06 17:50:...
1234   |  222     | 2016-04-06 18:36:...
4567   |  111     | 2016-04-06 18:50:...
4567   |  222     | 2016-04-06 19:36:...
Run Code Online (Sandbox Code Playgroud)

有人可以帮助我如何使用PySpark数据帧获得给定数据中的最大日期?

zer*_*323 26

对于非数字,但Orderable类型,您可以使用aggmax直接:

from pyspark.sql.functions import col, max as max_

df = sc.parallelize([
    ("2016-04-06 16:36", 1234, 111, 1),
    ("2016-04-06 17:35", 1234, 111, 5),
]).toDF(["datetime", "userId", "memberId", "value"])

(df.withColumn("datetime", col("datetime").cast("timestamp"))
    .groupBy("userId", "memberId")
    .agg(max_("datetime")))

## +------+--------+--------------------+
## |userId|memberId|       max(datetime)|
## +------+--------+--------------------+
## |  1234|     111|2016-04-06 17:35:...|
## +------+--------+--------------------+
Run Code Online (Sandbox Code Playgroud)

  • @thentangler max_ 是 pyspark 本身的 max 函数的别名(请参阅代码中的 import 语句)。如果你只执行 max('datetime'),它将使用 python 函数 'max',该函数不适用于列。 (3认同)
  • @ElsaLi `从 pyspark.sql.functions import min as min_; df.withColumn.groupBy(...).agg(min_("datetime"), max_("datetime"))` (2认同)
  • 我们是否需要使用 _ 而不仅仅是 ``max('datetime')``` ? (2认同)
  • @user2177768感谢您指出,是否可以保留列“value”而不与原始 df 进行连接 (2认同)