ale*_*oid 2 apache-spark apache-spark-sql pyspark
我有以下 Apache Spark DataFrame ( DF1 ):
function_name | param1 | param2 | param3 | result
---------------------------------------------------
f1 | a | b | c | 1
f1 | b | d | m | 0
f2 | a | b | c | 0
f2 | b | d | m | 0
f3 | a | b | c | 1
f3 | b | d | m | 1
f4 | a | b | c | 0
f4 | b | d | m | 0
Run Code Online (Sandbox Code Playgroud)
首先,我想对 DataFrame by 进行分组function_name,将结果收集到ArrayType并接收新的 DataFrame ( DF2 ):
function_name | result_list
--------------------------------
f1 | [1,0]
f2 | [0,0]
f3 | [1,1]
f4 | [0,0]
Run Code Online (Sandbox Code Playgroud)
之后,我需要通过分组function_name进行收集,我将收到如下新的 DataFrame ( DF3 ):ArrayTyperesult_list
result_list | function_name_lists
------------------------------------
[1,0] | [f1]
[0,0] | [f2,f4]
[1,1] | [f3]
Run Code Online (Sandbox Code Playgroud)
所以,我有一个问题 - 首先,我可以在 Apache Spark 中使用按 ArrayType 列分组吗?如果是这样,我可能会在result_listArrayType 单个字段中拥有数千万个值。result_list在这种情况下,Apache Spark 能够按列分组吗?
是的,你可以这么做。
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import *
list=[['f1','a','b','c',1],
['f1','b','d','m',0],
['f2','a','b','c',0],
['f2','b','d','m',0],
['f3','a','b','c',1],
['f3','b','d','m',1],
['f4','a','b','c',0],
['f4','b','d','m',0]]
df= spark.createDataFrame(list,['function_name','param1','param2','param3','result'])
df.show()
+-------------+------+------+------+------+
|function_name|param1|param2|param3|result|
+-------------+------+------+------+------+
| f1| a| b| c| 1|
| f1| b| d| m| 0|
| f2| a| b| c| 0|
| f2| b| d| m| 0|
| f3| a| b| c| 1|
| f3| b| d| m| 1|
| f4| a| b| c| 0|
| f4| b| d| m| 0|
+-------------+------+------+------+------+
Run Code Online (Sandbox Code Playgroud)
w=Window().partitionBy("function_name").orderBy(F.col("param1"),F.col("param2"),F.col("param3"))
w1=Window().partitionBy("function_name")
df1=df.withColumn("result_list", F.collect_list("result").over(w)).withColumn("result2",F.row_number().over(w))\
.withColumn("result3",F.max("result2").over(w1))\
.filter(F.col("result2")==F.col("result3")).drop("param1","param2","param3","result","result2","result3")
df1.groupBy("result_list")\
.agg(F.collect_list("function_name").alias("function_name_list")).show()
+-----------+------------------+
|result_list|function_name_list|
+-----------+------------------+
| [1, 0]| [f1]|
| [1, 1]| [f3]|
| [0, 0]| [f2, f4]|
+-----------+------------------+
Run Code Online (Sandbox Code Playgroud)
为了对数组类型列进行进一步的分析、转换或清理,我建议您查看 Spark2.4 及更高版本中新的高阶函数。
(collect_list适用于spark1.6及以上版本)
开源中的高阶函数:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.collect_list
Databricks 发布:链接:https://docs.databricks.com/delta/data-transformation/higher-order-lambda-functions.html
| 归档时间: |
|
| 查看次数: |
2027 次 |
| 最近记录: |