聚合函数计算Spark中groupBy的使用情况

Adi*_*iel 30 java scala apache-spark apache-spark-sql pyspark

我试图在pySpark中的一行代码中进行多个操作,并且不确定这是否适用于我的情况.

我的意图是不必将输出保存为新的数据帧.

我目前的代码很简单:

encodeUDF = udf(encode_time, StringType())
new_log_df.cache().withColumn('timePeriod', encodeUDF(col('START_TIME')))
  .groupBy('timePeriod')
  .agg(
    mean('DOWNSTREAM_SIZE').alias("Mean"),
    stddev('DOWNSTREAM_SIZE').alias("Stddev")
  )
  .show(20, False)
Run Code Online (Sandbox Code Playgroud)

我的目的是count()在使用之后添加groupBy,以获得与timePeriod列的每个值匹配的记录计数,打印\显示为输出.

在尝试使用时,groupBy(..).count().agg(..)我得到例外.

是否有任何方法可以实现这两个count()agg() .show()打印,而无需将代码拆分为两行命令,例如:

new_log_df.withColumn(..).groupBy(..).count()
new_log_df.withColumn(..).groupBy(..).agg(..).show()
Run Code Online (Sandbox Code Playgroud)

或者更好的是,将合并的输出输出到agg.show()输出 - 一个额外的列,它表示与行的值匹配的计数记录数.例如:

timePeriod | Mean | Stddev | Num Of Records
    X      | 10   |   20   |    315
Run Code Online (Sandbox Code Playgroud)

mrs*_*vas 58

count()可以在里面使用,agg()因为groupBy表达式是相同的.

用Python

import pyspark.sql.functions as func

new_log_df.cache().withColumn("timePeriod", encodeUDF(new_log_df["START_TIME"])) 
  .groupBy("timePeriod")
  .agg(
     func.mean("DOWNSTREAM_SIZE").alias("Mean"), 
     func.stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     func.count(func.lit(1)).alias("Num Of Records")
   )
  .show(20, False)
Run Code Online (Sandbox Code Playgroud)

pySpark SQL函数doc

随着Scala

import org.apache.spark.sql.functions._ //for count()

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
  .groupBy("timePeriod")
  .agg(
     mean("DOWNSTREAM_SIZE").alias("Mean"), 
     stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     count(lit(1)).alias("Num Of Records")
   )
  .show(20, false)
Run Code Online (Sandbox Code Playgroud)

count(1) 将按第一列计算记录,该列等于 count("timePeriod")

用Java

import static org.apache.spark.sql.functions.*;

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
  .groupBy("timePeriod")
  .agg(
     mean("DOWNSTREAM_SIZE").alias("Mean"), 
     stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     count(lit(1)).alias("Num Of Records")
   )
  .show(20, false)
Run Code Online (Sandbox Code Playgroud)

  • 一个小的语法评论:我非常喜欢 Python 中的 dict 语法,例如 `.agg({"X: "sum", "Y": "sum", "Z": "sum", "blah": "count"})`,这与 `.withColumn("blah", lit(1))` 配合得非常好 - 可能有更好的方法,但我还没有找到它(还!)。 (2认同)