GroupBy和concat数组列pyspark

Car*_*llo 12 apache-spark-sql pyspark

我有这个数据框

df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(["store", "values"])

+-----+---------+
|store|   values|
+-----+---------+
|    1|[1, 2, 3]|
|    1|[4, 5, 6]|
|    2|      [2]|
|    2|      [3]|
+-----+---------+
Run Code Online (Sandbox Code Playgroud)

我想转换成以下df:

+-----+------------------+
|store|      values      |
+-----+------------------+
|    1|[1, 2, 3, 4, 5, 6]|
|    2|            [2, 3]|
+-----+------------------+
Run Code Online (Sandbox Code Playgroud)

我这样做了:

from  pyspark.sql import functions as F
df.groupBy("store").agg(F.collect_list("values"))
Run Code Online (Sandbox Code Playgroud)

但解决方案有这个 WrappedArrays

+-----+----------------------------------------------+
|store|collect_list(values)                          |
+-----+----------------------------------------------+
|1    |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)]|
|2    |[WrappedArray(2), WrappedArray(3)]            |
+-----+----------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

有没有办法将其转换WrappedArrays为连续数组?或者我可以采用不同的方式吗?

谢谢!

小智 21

现在,可以使用展平功能,事情变得容易多了。您只需在 groupby 之后展平收集的数组即可。

# 1. Create the DF

    df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(["store","values"])

+-----+---------+
|store|   values|
+-----+---------+
|    1|[1, 2, 3]|
|    1|[4, 5, 6]|
|    2|      [2]|
|    2|      [3]|
+-----+---------+

# 2. Group by store

    df = df.groupBy("store").agg(F.collect_list("values"))

+-----+--------------------+
|store|collect_list(values)|
+-----+--------------------+
|    1|[[1, 2, 3], [4, 5...|
|    2|          [[2], [3]]|
+-----+--------------------+

# 3. finally.... flat the array

    df = df.withColumn("flatten_array", F.flatten("collect_list(values)"))

+-----+--------------------+------------------+
|store|collect_list(values)|     flatten_array|
+-----+--------------------+------------------+
|    1|[[1, 2, 3], [4, 5...|[1, 2, 3, 4, 5, 6]|
|    2|          [[2], [3]]|            [2, 3]|
+-----+--------------------+------------------+
Run Code Online (Sandbox Code Playgroud)

  • 好的。只是补充一下。所有这些都可以通过 `df.groupBy("store").agg(F.flatten(F.collect_list("values")))` 一步完成 (4认同)

des*_*aut 13

你需要一个展平的UDF; 从你自己开始df:

spark.version
# u'2.2.0'

from pyspark.sql import functions as F
import pyspark.sql.types as T

def fudf(val):
    return reduce (lambda x, y:x+y, val)

flattenUdf = F.udf(fudf, T.ArrayType(T.IntegerType()))

df2 = df.groupBy("store").agg(F.collect_list("values"))
df2.show(truncate=False)
# +-----+----------------------------------------------+ 
# |store|                         collect_list(values) | 
# +-----+----------------------------------------------+ 
# |1    |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)]| 
# |2    |[WrappedArray(2), WrappedArray(3)]            | 
# +-----+----------------------------------------------+

df3 = df2.select("store", flattenUdf("collect_list(values)").alias("values"))
df3.show(truncate=False)
# +-----+------------------+
# |store|           values |
# +-----+------------------+
# |1    |[1, 2, 3, 4, 5, 6]|
# |2    |[2, 3]            |
# +-----+------------------+
Run Code Online (Sandbox Code Playgroud)


Mik*_*der 9

对于像这样的简单问题,您还可以使用爆炸功能。不过,我不知道与所选 udf 答案相比的性能特征。

from pyspark.sql import functions as F

df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(['store', 'values'])

df2 = df.withColumn('values', F.explode('values'))
# +-----+------+
# |store|values|
# +-----+------+
# |    1|     1|
# |    1|     2|
# |    1|     3|
# |    1|     4|
# |    1|     5|
# |    1|     6|
# |    2|     2|
# |    2|     3|
# +-----+------+

df3 = df2.groupBy('store').agg(F.collect_list('values').alias('values'))
# +-----+------------------+
# |store|           values |
# +-----+------------------+
# |1    |[4, 5, 6, 1, 2, 3]|
# |2    |[2, 3]            |
# +-----+------------------+
Run Code Online (Sandbox Code Playgroud)

注意:您可以F.collect_set()在聚合中或.drop_duplicates()在 df2 上使用来删除重复值。

如果您想在收集的列表中维护有序值,我在另一个 SO 答案中找到了以下方法:

from pyspark.sql.window import Window

w = Window.partitionBy('store').orderBy('values')
df3 = df2.withColumn('ordered_value_lists', F.collect_list('values').over(w))
# +-----+------+-------------------+
# |store|values|ordered_value_lists|
# +-----+------+-------------------+
# |1    |1     |[1]                |
# |1    |2     |[1, 2]             |
# |1    |3     |[1, 2, 3]          |
# |1    |4     |[1, 2, 3, 4]       |
# |1    |5     |[1, 2, 3, 4, 5]    |
# |1    |6     |[1, 2, 3, 4, 5, 6] |
# |2    |2     |[2]                |
# |2    |3     |[2, 3]             |
# +-----+------+-------------------+

df4 = df3.groupBy('store').agg(F.max('ordered_value_lists').alias('values'))
df4.show(truncate=False)
# +-----+------------------+
# |store|values            |
# +-----+------------------+
# |1    |[1, 2, 3, 4, 5, 6]|
# |2    |[2, 3]            |
# +-----+------------------+
Run Code Online (Sandbox Code Playgroud)

如果值本身不能确定顺序,您可以在窗口函数中使用F.posexplode()和使用'pos'列,而不是'values'确定顺序。注意:您还需要一个更高级别的 order 列来对原始数组进行排序,然后使用数组中的位置对数组的元素进行排序。

df = sc.parallelize([(1, [1, 2, 3], 1), (1, [4, 5, 6], 2) , (2, [2], 1),(2, [3], 2)]).toDF(['store', 'values', 'array_order'])
# +-----+---------+-----------+
# |store|values   |array_order|
# +-----+---------+-----------+
# |1    |[1, 2, 3]|1          |
# |1    |[4, 5, 6]|2          |
# |2    |[2]      |1          |
# |2    |[3]      |2          |
# +-----+---------+-----------+

df2 = df.select('*', F.posexplode('values'))
# +-----+---------+-----------+---+---+
# |store|values   |array_order|pos|col|
# +-----+---------+-----------+---+---+
# |1    |[1, 2, 3]|1          |0  |1  |
# |1    |[1, 2, 3]|1          |1  |2  |
# |1    |[1, 2, 3]|1          |2  |3  |
# |1    |[4, 5, 6]|2          |0  |4  |
# |1    |[4, 5, 6]|2          |1  |5  |
# |1    |[4, 5, 6]|2          |2  |6  |
# |2    |[2]      |1          |0  |2  |
# |2    |[3]      |2          |0  |3  |
# +-----+---------+-----------+---+---+

w = Window.partitionBy('store').orderBy('array_order', 'pos')
df3 = df2.withColumn('ordered_value_lists', F.collect_list('col').over(w))
# +-----+---------+-----------+---+---+-------------------+
# |store|values   |array_order|pos|col|ordered_value_lists|
# +-----+---------+-----------+---+---+-------------------+
# |1    |[1, 2, 3]|1          |0  |1  |[1]                |
# |1    |[1, 2, 3]|1          |1  |2  |[1, 2]             |
# |1    |[1, 2, 3]|1          |2  |3  |[1, 2, 3]          |
# |1    |[4, 5, 6]|2          |0  |4  |[1, 2, 3, 4]       |
# |1    |[4, 5, 6]|2          |1  |5  |[1, 2, 3, 4, 5]    |
# |1    |[4, 5, 6]|2          |2  |6  |[1, 2, 3, 4, 5, 6] |
# |2    |[2]      |1          |0  |2  |[2]                |
# |2    |[3]      |2          |0  |3  |[2, 3]             |
# +-----+---------+-----------+---+---+-------------------+

df4 = df3.groupBy('store').agg(F.max('ordered_value_lists').alias('values'))
# +-----+------------------+
# |store|values            |
# +-----+------------------+
# |1    |[1, 2, 3, 4, 5, 6]|
# |2    |[2, 3]            |
# +-----+------------------+
Run Code Online (Sandbox Code Playgroud)

编辑:如果您想保留一些列以供使用并且它们不需要聚合,您可以将它们包含在 中groupBy或在聚合后重新加入它们(下面的示例)。如果他们确实需要聚合,则只进行分组,'store'并将'other'列上所需的任何聚合函数添加到.agg()调用中。

from pyspark.sql import functions as F

df = sc.parallelize([(1, [1, 2, 3], 'a'), (1, [4, 5, 6], 'a') , (2, [2], 'b'), (2, [3], 'b')]).toDF(['store', 'values', 'other'])
# +-----+---------+-----+
# |store|   values|other|
# +-----+---------+-----+
# |    1|[1, 2, 3]|    a|
# |    1|[4, 5, 6]|    a|
# |    2|      [2]|    b|
# |    2|      [3]|    b|
# +-----+---------+-----+

df2 = df.withColumn('values', F.explode('values'))
# +-----+------+-----+
# |store|values|other|
# +-----+------+-----+
# |    1|     1|    a|
# |    1|     2|    a|
# |    1|     3|    a|
# |    1|     4|    a|
# |    1|     5|    a|
# |    1|     6|    a|
# |    2|     2|    b|
# |    2|     3|    b|
# +-----+------+-----+

df3 = df2.groupBy('store', 'other').agg(F.collect_list('values').alias('values'))
# +-----+-----+------------------+
# |store|other|            values|
# +-----+-----+------------------+
# |    1|    a|[1, 2, 3, 4, 5, 6]|
# |    2|    b|            [2, 3]|
# +-----+-----+------------------+

df4 = (
    df.drop('values')
    .join(
        df2.groupBy('store')
        .agg(F.collect_list('values').alias('values')),
        on=['store'], how='inner'
    )
    .drop_duplicates()
)
# +-----+-----+------------------+
# |store|other|            values|
# +-----+-----+------------------+
# |    1|    a|[1, 2, 3, 4, 5, 6]|
# |    2|    b|            [2, 3]|
# +-----+-----+------------------+
Run Code Online (Sandbox Code Playgroud)


Bal*_*ala 7

我可能会这样做。

>>> df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(["store", "values"])
>>> df.show()
+-----+---------+
|store|   values|
+-----+---------+
|    1|[1, 2, 3]|
|    1|[4, 5, 6]|
|    2|      [2]|
|    2|      [3]|
+-----+---------+

>>> df.rdd.map(lambda r: (r.store, r.values)).reduceByKey(lambda x,y: x + y).toDF(['store','values']).show()
+-----+------------------+
|store|            values|
+-----+------------------+
|    1|[1, 2, 3, 4, 5, 6]|
|    2|            [2, 3]|
+-----+------------------+
Run Code Online (Sandbox Code Playgroud)