Apache Spark 按 DF 分组,将值收集到列表中,然后按列表分组

ale*_*oid 2 apache-spark apache-spark-sql pyspark

我有以下 Apache Spark DataFrame ( DF1 ):

function_name | param1 | param2 | param3 | result
---------------------------------------------------
     f1       |   a    |   b    |   c    |   1        
     f1       |   b    |   d    |   m    |   0
     f2       |   a    |   b    |   c    |   0
     f2       |   b    |   d    |   m    |   0
     f3       |   a    |   b    |   c    |   1
     f3       |   b    |   d    |   m    |   1
     f4       |   a    |   b    |   c    |   0
     f4       |   b    |   d    |   m    |   0
Run Code Online (Sandbox Code Playgroud)

首先,我想对 DataFrame by 进行分组function_name,将结果收集到ArrayType并接收新的 DataFrame ( DF2 ):

function_name | result_list
--------------------------------
     f1       |  [1,0]
     f2       |  [0,0]
     f3       |  [1,1]
     f4       |  [0,0]
Run Code Online (Sandbox Code Playgroud)

之后,我需要通过分组function_name进行收集,我将收到如下新的 DataFrame ( DF3 ):ArrayTyperesult_list

result_list |  function_name_lists
------------------------------------
    [1,0]   |   [f1]
    [0,0]   |   [f2,f4]
    [1,1]   |   [f3]
Run Code Online (Sandbox Code Playgroud)

所以,我有一个问题 - 首先,我可以在 Apache Spark 中使用按 ArrayType 列分组吗?如果是这样,我可能会在result_listArrayType 单个字段中拥有数千万个值。result_list在这种情况下,Apache Spark 能够按列分组吗?

mur*_*ash 6

是的,你可以这么做。

创建数据框:

from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import *
list=[['f1','a','b','c',1],
     ['f1','b','d','m',0],
     ['f2','a','b','c',0],
     ['f2','b','d','m',0],
     ['f3','a','b','c',1],
     ['f3','b','d','m',1],
     ['f4','a','b','c',0],
      ['f4','b','d','m',0]]

df= spark.createDataFrame(list,['function_name','param1','param2','param3','result'])
df.show()

+-------------+------+------+------+------+
|function_name|param1|param2|param3|result|
+-------------+------+------+------+------+
|           f1|     a|     b|     c|     1|
|           f1|     b|     d|     m|     0|
|           f2|     a|     b|     c|     0|
|           f2|     b|     d|     m|     0|
|           f3|     a|     b|     c|     1|
|           f3|     b|     d|     m|     1|
|           f4|     a|     b|     c|     0|
|           f4|     b|     d|     m|     0|
+-------------+------+------+------+------+
Run Code Online (Sandbox Code Playgroud)

按function_name分组,然后按result_list分组(使用collect_list),使用param1,param2,param3的顺序:

w=Window().partitionBy("function_name").orderBy(F.col("param1"),F.col("param2"),F.col("param3"))
w1=Window().partitionBy("function_name")
df1=df.withColumn("result_list", F.collect_list("result").over(w)).withColumn("result2",F.row_number().over(w))\
.withColumn("result3",F.max("result2").over(w1))\
.filter(F.col("result2")==F.col("result3")).drop("param1","param2","param3","result","result2","result3")


df1.groupBy("result_list")\
.agg(F.collect_list("function_name").alias("function_name_list")).show()

    +-----------+------------------+
    |result_list|function_name_list|
    +-----------+------------------+
    |     [1, 0]|              [f1]|
    |     [1, 1]|              [f3]|
    |     [0, 0]|          [f2, f4]|
    +-----------+------------------+
Run Code Online (Sandbox Code Playgroud)

为了对数组类型列进行进一步的分析、转换或清理,我建议您查看 Spark2.4 及更高版本中新的高阶函数。

(collect_list适用于spark1.6及以上版本)

开源中的高阶函数:

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.collect_list

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.array_contains开始

Databricks 发布:链接:https://docs.databricks.com/delta/data-transformation/higher-order-lambda-functions.html

  • 我也添加了官方文档的链接。感谢您指出 (3认同)
  • 我只是说你的链接可能会令人困惑。Collect_list 从 Spark 1.6 开始可用。而是放置官方文档的链接。 (2认同)
  • 其他数组函数,如 array_distinct、array_except、array_join、array_max 等可以被视为在 Spark 2.4 中发布的高阶函数。但是你是对的,其他转换、过滤器高阶函数仅在 databricks 运行时上。 (2认同)
  • @MohammadMurtazaHashmi 感谢您的回答!我更新了一点我的问题,并在 DF1 中添加了 3 个新列 - `param1 | 参数2 | 参数3`。在收集的列表中,我想保留基于 param1、param2、param3 列的结果顺序。那么添加 `.over(Window.partitionBy("function_name").orderBy("param1, param2, param3")))` 就可以了? (2认同)
  • @alexanoid 我已经更新了答案,请检查一下。它将适用于您想要的任意多个不同的 function_name 条目,因为 rownumber 和 max 只会采用具有完整有序数组的行。我还尝试了 param1、2、3 的不同组合,并且顺序成功更改。让我知道 (2认同)
  • 当您进行分组时,它会获取函数和聚合的所有唯一性,但窗口是逐步聚合的。因此第一行只有 [0],第二行将有 [0,1],如果该名称有最后第三行,那么它将有 [0,1,1]。所以我强制它只获取窗口的最后一行,该行将在数组中组装所有内容,这就是为什么我使用 rownumber 和 max 来获取最后填充的行。windows r 与 groupby 不同 (2认同)
  • 我现在明白了!这是一个绝妙的解决方案。非常感谢您的帮助和耐心! (2认同)