Bry*_*ind 2 group-by pyspark spark-dataframe
我有一个pyspark数据框。例如,
d= hiveContext.createDataFrame([("A", 1), ("B", 2), ("D", 3), ("D", 3), ("A", 4), ("D", 3)],["Col1", "Col2"])
+----+----+
|Col1|Col2|
+----+----+
| A| 1|
| B| 2|
| D| 3|
| D| 3|
| A| 4|
| D| 3|
+----+----+
Run Code Online (Sandbox Code Playgroud)
我想分组Col1,然后创建一个列表Col2。我需要弄平组。我确实有很多专栏。
+----+----------+
|Col1| Col2|
+----+----------+
| A| [1,4] |
| B| [2] |
| D| [3,3,3]|
+----+----------+
Run Code Online (Sandbox Code Playgroud)
您可以做一个groupBy()并将其collect_list()用作您的汇总函数:
import pyspark.sql.functions as f
d.groupBy('Col1').agg(f.collect_list('Col2').alias('Col2')).show()
#+----+---------+
#|Col1| Col2|
#+----+---------+
#| B| [2]|
#| D|[3, 3, 3]|
#| A| [1, 4]|
#+----+---------+
Run Code Online (Sandbox Code Playgroud)
如果要合并多个列,则可以collect_list()在每个列上使用,然后使用struct()和合并结果列表udf()。考虑以下示例:
创建虚拟数据
from operator import add
import pyspark.sql.functions as f
# create example dataframe
d = sqlcx.createDataFrame(
[
("A", 1, 10),
("B", 2, 20),
("D", 3, 30),
("D", 3, 10),
("A", 4, 20),
("D", 3, 30)
],
["Col1", "Col2", "Col3"]
)
Run Code Online (Sandbox Code Playgroud)
将所需的列收集到列表中
假设您有一个要收集到列表中的列的列表。您可以执行以下操作:
cols_to_combine = ['Col2', 'Col3']
d.groupBy('Col1').agg(*[f.collect_list(c).alias(c) for c in cols_to_combine]).show()
#+----+---------+------------+
#|Col1| Col2| Col3|
#+----+---------+------------+
#| B| [2]| [20]|
#| D|[3, 3, 3]|[30, 10, 30]|
#| A| [4, 1]| [20, 10]|
#+----+---------+------------+
Run Code Online (Sandbox Code Playgroud)
将结果列表合并为一列
现在,我们希望将列表列合并为一个列表。如果使用struct(),我们将获得以下信息:
d.groupBy('Col1').agg(*[f.collect_list(c).alias(c) for c in cols_to_combine])\
.select('Col1', f.struct(*cols_to_combine).alias('Combined'))\
.show(truncate=False)
#+----+------------------------------------------------+
#|Col1|Combined |
#+----+------------------------------------------------+
#|B |[WrappedArray(2),WrappedArray(20)] |
#|D |[WrappedArray(3, 3, 3),WrappedArray(10, 30, 30)]|
#|A |[WrappedArray(1, 4),WrappedArray(10, 20)] |
#+----+------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)
展平包装阵列
差不多好了。我们只需要组合WrappedArrays。我们可以通过以下方式实现udf():
combine_wrapped_arrays = f.udf(lambda val: reduce(add, val), ArrayType(IntegerType()))
d.groupBy('Col1').agg(*[f.collect_list(c).alias(c) for c in cols_to_combine])\
.select('Col1', combine_wrapped_arrays(f.struct(*cols_to_combine)).alias('Combined'))\
.show(truncate=False)
#+----+---------------------+
#|Col1|Combined |
#+----+---------------------+
#|B |[2, 20] |
#|D |[3, 3, 3, 30, 10, 30]|
#|A |[1, 4, 10, 20] |
#+----+---------------------+
Run Code Online (Sandbox Code Playgroud)
参考文献
一种更简单的方法,无需处理WrappedArrays:
from operator import add
combine_udf = lambda cols: f.udf(
lambda *args: reduce(add, args),
ArrayType(IntegerType())
)
d.groupBy('Col1').agg(*[f.collect_list(c).alias(c) for c in cols_to_combine])\
.select('Col1', combine_udf(cols_to_combine)(*cols_to_combine).alias('Combined'))\
.show(truncate=False)
#+----+---------------------+
#|Col1|Combined |
#+----+---------------------+
#|B |[2, 20] |
#|D |[3, 3, 3, 30, 10, 30]|
#|A |[1, 4, 10, 20] |
#+----+---------------------+
Run Code Online (Sandbox Code Playgroud)
注意:仅当所有列的数据类型相同时,此最后一步才有效。您不能使用此功能来组合包装数组和混合类型。
从火花 2.4 你可以使用 pyspark.sql.functions.flatten
import pyspark.sql.functions as f
df.groupBy('Col1').agg(f.flatten(f.collect_list('Col2')).alias('Col2')).show()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1541 次 |
| 最近记录: |