Car*_*llo 12 apache-spark-sql pyspark
我有这个数据框
df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(["store", "values"])
+-----+---------+
|store| values|
+-----+---------+
| 1|[1, 2, 3]|
| 1|[4, 5, 6]|
| 2| [2]|
| 2| [3]|
+-----+---------+
Run Code Online (Sandbox Code Playgroud)
我想转换成以下df:
+-----+------------------+
|store| values |
+-----+------------------+
| 1|[1, 2, 3, 4, 5, 6]|
| 2| [2, 3]|
+-----+------------------+
Run Code Online (Sandbox Code Playgroud)
我这样做了:
from pyspark.sql import functions as F
df.groupBy("store").agg(F.collect_list("values"))
Run Code Online (Sandbox Code Playgroud)
但解决方案有这个 WrappedArrays
+-----+----------------------------------------------+
|store|collect_list(values) |
+-----+----------------------------------------------+
|1 |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)]|
|2 |[WrappedArray(2), WrappedArray(3)] |
+-----+----------------------------------------------+
Run Code Online (Sandbox Code Playgroud)
有没有办法将其转换WrappedArrays为连续数组?或者我可以采用不同的方式吗?
谢谢!
小智 21
现在,可以使用展平功能,事情变得容易多了。您只需在 groupby 之后展平收集的数组即可。
# 1. Create the DF
df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(["store","values"])
+-----+---------+
|store| values|
+-----+---------+
| 1|[1, 2, 3]|
| 1|[4, 5, 6]|
| 2| [2]|
| 2| [3]|
+-----+---------+
# 2. Group by store
df = df.groupBy("store").agg(F.collect_list("values"))
+-----+--------------------+
|store|collect_list(values)|
+-----+--------------------+
| 1|[[1, 2, 3], [4, 5...|
| 2| [[2], [3]]|
+-----+--------------------+
# 3. finally.... flat the array
df = df.withColumn("flatten_array", F.flatten("collect_list(values)"))
+-----+--------------------+------------------+
|store|collect_list(values)| flatten_array|
+-----+--------------------+------------------+
| 1|[[1, 2, 3], [4, 5...|[1, 2, 3, 4, 5, 6]|
| 2| [[2], [3]]| [2, 3]|
+-----+--------------------+------------------+
Run Code Online (Sandbox Code Playgroud)
des*_*aut 13
你需要一个展平的UDF; 从你自己开始df:
spark.version
# u'2.2.0'
from pyspark.sql import functions as F
import pyspark.sql.types as T
def fudf(val):
return reduce (lambda x, y:x+y, val)
flattenUdf = F.udf(fudf, T.ArrayType(T.IntegerType()))
df2 = df.groupBy("store").agg(F.collect_list("values"))
df2.show(truncate=False)
# +-----+----------------------------------------------+
# |store| collect_list(values) |
# +-----+----------------------------------------------+
# |1 |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)]|
# |2 |[WrappedArray(2), WrappedArray(3)] |
# +-----+----------------------------------------------+
df3 = df2.select("store", flattenUdf("collect_list(values)").alias("values"))
df3.show(truncate=False)
# +-----+------------------+
# |store| values |
# +-----+------------------+
# |1 |[1, 2, 3, 4, 5, 6]|
# |2 |[2, 3] |
# +-----+------------------+
Run Code Online (Sandbox Code Playgroud)
对于像这样的简单问题,您还可以使用爆炸功能。不过,我不知道与所选 udf 答案相比的性能特征。
from pyspark.sql import functions as F
df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(['store', 'values'])
df2 = df.withColumn('values', F.explode('values'))
# +-----+------+
# |store|values|
# +-----+------+
# | 1| 1|
# | 1| 2|
# | 1| 3|
# | 1| 4|
# | 1| 5|
# | 1| 6|
# | 2| 2|
# | 2| 3|
# +-----+------+
df3 = df2.groupBy('store').agg(F.collect_list('values').alias('values'))
# +-----+------------------+
# |store| values |
# +-----+------------------+
# |1 |[4, 5, 6, 1, 2, 3]|
# |2 |[2, 3] |
# +-----+------------------+
Run Code Online (Sandbox Code Playgroud)
注意:您可以F.collect_set()在聚合中或.drop_duplicates()在 df2 上使用来删除重复值。
如果您想在收集的列表中维护有序值,我在另一个 SO 答案中找到了以下方法:
from pyspark.sql.window import Window
w = Window.partitionBy('store').orderBy('values')
df3 = df2.withColumn('ordered_value_lists', F.collect_list('values').over(w))
# +-----+------+-------------------+
# |store|values|ordered_value_lists|
# +-----+------+-------------------+
# |1 |1 |[1] |
# |1 |2 |[1, 2] |
# |1 |3 |[1, 2, 3] |
# |1 |4 |[1, 2, 3, 4] |
# |1 |5 |[1, 2, 3, 4, 5] |
# |1 |6 |[1, 2, 3, 4, 5, 6] |
# |2 |2 |[2] |
# |2 |3 |[2, 3] |
# +-----+------+-------------------+
df4 = df3.groupBy('store').agg(F.max('ordered_value_lists').alias('values'))
df4.show(truncate=False)
# +-----+------------------+
# |store|values |
# +-----+------------------+
# |1 |[1, 2, 3, 4, 5, 6]|
# |2 |[2, 3] |
# +-----+------------------+
Run Code Online (Sandbox Code Playgroud)
如果值本身不能确定顺序,您可以在窗口函数中使用F.posexplode()和使用'pos'列,而不是'values'确定顺序。注意:您还需要一个更高级别的 order 列来对原始数组进行排序,然后使用数组中的位置对数组的元素进行排序。
df = sc.parallelize([(1, [1, 2, 3], 1), (1, [4, 5, 6], 2) , (2, [2], 1),(2, [3], 2)]).toDF(['store', 'values', 'array_order'])
# +-----+---------+-----------+
# |store|values |array_order|
# +-----+---------+-----------+
# |1 |[1, 2, 3]|1 |
# |1 |[4, 5, 6]|2 |
# |2 |[2] |1 |
# |2 |[3] |2 |
# +-----+---------+-----------+
df2 = df.select('*', F.posexplode('values'))
# +-----+---------+-----------+---+---+
# |store|values |array_order|pos|col|
# +-----+---------+-----------+---+---+
# |1 |[1, 2, 3]|1 |0 |1 |
# |1 |[1, 2, 3]|1 |1 |2 |
# |1 |[1, 2, 3]|1 |2 |3 |
# |1 |[4, 5, 6]|2 |0 |4 |
# |1 |[4, 5, 6]|2 |1 |5 |
# |1 |[4, 5, 6]|2 |2 |6 |
# |2 |[2] |1 |0 |2 |
# |2 |[3] |2 |0 |3 |
# +-----+---------+-----------+---+---+
w = Window.partitionBy('store').orderBy('array_order', 'pos')
df3 = df2.withColumn('ordered_value_lists', F.collect_list('col').over(w))
# +-----+---------+-----------+---+---+-------------------+
# |store|values |array_order|pos|col|ordered_value_lists|
# +-----+---------+-----------+---+---+-------------------+
# |1 |[1, 2, 3]|1 |0 |1 |[1] |
# |1 |[1, 2, 3]|1 |1 |2 |[1, 2] |
# |1 |[1, 2, 3]|1 |2 |3 |[1, 2, 3] |
# |1 |[4, 5, 6]|2 |0 |4 |[1, 2, 3, 4] |
# |1 |[4, 5, 6]|2 |1 |5 |[1, 2, 3, 4, 5] |
# |1 |[4, 5, 6]|2 |2 |6 |[1, 2, 3, 4, 5, 6] |
# |2 |[2] |1 |0 |2 |[2] |
# |2 |[3] |2 |0 |3 |[2, 3] |
# +-----+---------+-----------+---+---+-------------------+
df4 = df3.groupBy('store').agg(F.max('ordered_value_lists').alias('values'))
# +-----+------------------+
# |store|values |
# +-----+------------------+
# |1 |[1, 2, 3, 4, 5, 6]|
# |2 |[2, 3] |
# +-----+------------------+
Run Code Online (Sandbox Code Playgroud)
编辑:如果您想保留一些列以供使用并且它们不需要聚合,您可以将它们包含在 中groupBy或在聚合后重新加入它们(下面的示例)。如果他们确实需要聚合,则只进行分组,'store'并将'other'列上所需的任何聚合函数添加到.agg()调用中。
from pyspark.sql import functions as F
df = sc.parallelize([(1, [1, 2, 3], 'a'), (1, [4, 5, 6], 'a') , (2, [2], 'b'), (2, [3], 'b')]).toDF(['store', 'values', 'other'])
# +-----+---------+-----+
# |store| values|other|
# +-----+---------+-----+
# | 1|[1, 2, 3]| a|
# | 1|[4, 5, 6]| a|
# | 2| [2]| b|
# | 2| [3]| b|
# +-----+---------+-----+
df2 = df.withColumn('values', F.explode('values'))
# +-----+------+-----+
# |store|values|other|
# +-----+------+-----+
# | 1| 1| a|
# | 1| 2| a|
# | 1| 3| a|
# | 1| 4| a|
# | 1| 5| a|
# | 1| 6| a|
# | 2| 2| b|
# | 2| 3| b|
# +-----+------+-----+
df3 = df2.groupBy('store', 'other').agg(F.collect_list('values').alias('values'))
# +-----+-----+------------------+
# |store|other| values|
# +-----+-----+------------------+
# | 1| a|[1, 2, 3, 4, 5, 6]|
# | 2| b| [2, 3]|
# +-----+-----+------------------+
df4 = (
df.drop('values')
.join(
df2.groupBy('store')
.agg(F.collect_list('values').alias('values')),
on=['store'], how='inner'
)
.drop_duplicates()
)
# +-----+-----+------------------+
# |store|other| values|
# +-----+-----+------------------+
# | 1| a|[1, 2, 3, 4, 5, 6]|
# | 2| b| [2, 3]|
# +-----+-----+------------------+
Run Code Online (Sandbox Code Playgroud)
我可能会这样做。
>>> df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(["store", "values"])
>>> df.show()
+-----+---------+
|store| values|
+-----+---------+
| 1|[1, 2, 3]|
| 1|[4, 5, 6]|
| 2| [2]|
| 2| [3]|
+-----+---------+
>>> df.rdd.map(lambda r: (r.store, r.values)).reduceByKey(lambda x,y: x + y).toDF(['store','values']).show()
+-----+------------------+
|store| values|
+-----+------------------+
| 1|[1, 2, 3, 4, 5, 6]|
| 2| [2, 3]|
+-----+------------------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7565 次 |
| 最近记录: |