pyspark - 在地图类型结构中创建DataFrame分组列

Alg*_*g_D 7 python sql dictionary pyspark spark-dataframe

我的DataFrame具有以下结构:

-------------------------
| Brand | type |  amount|
-------------------------
|  B   |   a  |   10   |
|  B   |   b  |   20   |
|  C   |   c  |   30   |
-------------------------
Run Code Online (Sandbox Code Playgroud)

我希望通过分组减少行数,type并将其amount放入一个类型的单个列中:Map 因此,Brand它将是唯一的,并且对于每个组合MAP_type_AMOUNT都有.key,valuetype amount

我认为Spark.sql可能有一些函数来帮助这个过程,或者我是否必须让RDD成为DataFrame并将我自己的"转换"转换为map类型?

预期:

   -------------------------
    | Brand | MAP_type_AMOUNT 
    -------------------------
    |  B    | {a: 10, b:20} |
    |  C    | {c: 30}       |
    -------------------------
Run Code Online (Sandbox Code Playgroud)

小智 8

Prem的回答略有改进(抱歉,我还不能发表评论)

func.create_map而不是func.struct.见文档

import pyspark.sql.functions as func
df = sc.parallelize([('B','a',10),('B','b',20),
('C','c',30)]).toDF(['Brand','Type','Amount'])

df_converted = df.groupBy("Brand").\
    agg(func.collect_list(func.create_map(func.col("Type"),
    func.col("Amount"))).alias("MAP_type_AMOUNT"))

print df_converted.collect()
Run Code Online (Sandbox Code Playgroud)

输出:

[Row(Brand=u'B', MAP_type_AMOUNT=[{u'a': 10}, {u'b': 20}]),
 Row(Brand=u'C', MAP_type_AMOUNT=[{u'c': 30}])]
Run Code Online (Sandbox Code Playgroud)

  • 这不会生成一个字典,而是生成一个字典数组,每个字典只有一个键和值对。 (6认同)

Lou*_*ang 7

一起使用collect_listmap_from_arrays可以实现这一点

import pyspark.sql.functions as F

df_converted = (
    df.groupBy('Brand')
    .agg(
        F.collect_list('type').alias('type'),
        F.collect_list('amount').alias('amount'),
    )
    .withColumn('MAP_type_AMOUNT', F.map_from_arrays('type', 'amount'))
    .drop('type', 'amount')
)
Run Code Online (Sandbox Code Playgroud)

输出

+-----+------------------+
|Brand|   MAP_type_AMOUNT|
+-----+------------------+
|    C|         [c -> 30]|
|    B|[b -> 20, a -> 10]|
+-----+------------------+
Run Code Online (Sandbox Code Playgroud)


Pre*_*rem 5

您可以输入以下内容,但不完全是“地图”

import pyspark.sql.functions as func
df = sc.parallelize([('B','a',10),('B','b',20),('C','c',30)]).toDF(['Brand','Type','Amount'])

df_converted = df.groupBy("Brand").\
    agg(func.collect_list(func.struct(func.col("Type"), func.col("Amount"))).alias("MAP_type_AMOUNT"))
df_converted.show()
Run Code Online (Sandbox Code Playgroud)

输出为:

+-----+----------------+
|Brand| MAP_type_AMOUNT|
+-----+----------------+
|    B|[[a,10], [b,20]]|
|    C|        [[c,30]]|
+-----+----------------+
Run Code Online (Sandbox Code Playgroud)

希望这可以帮助!