Pyspark 在 groupby 中创建字典

neo*_*bot 3 apache-spark pyspark

是否可以在 pyspark 中创建字典groupBy.agg()?这是一个玩具示例:

import pyspark
from pyspark.sql import Row
import pyspark.sql.functions as F

sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

toy_data = spark.createDataFrame([
    Row(id=1, key='a', value="123"),
    Row(id=1, key='b', value="234"),
    Row(id=1, key='c', value="345"),
    Row(id=2, key='a', value="12"),
    Row(id=2, key='x', value="23"),
    Row(id=2, key='y', value="123")])

toy_data.show()

+---+---+-----+
| id|key|value|
+---+---+-----+
|  1|  a|  123|
|  1|  b|  234|
|  1|  c|  345|
|  2|  a|   12|
|  2|  x|   23|
|  2|  y|  123|
+---+---+-----+
Run Code Online (Sandbox Code Playgroud)

这是预期的输出

---+------------------------------------
id |  key_value
---+------------------------------------
1  | {"a": "123", "b": "234", "c": "345"}
2  | {"a": "12", "x": "23", "y": "123"}
---+------------------------------------
Run Code Online (Sandbox Code Playgroud)

======================================

我试过这个,但不起作用。

toy_data.groupBy("id").agg(
    F.create_map(col("key"),col("value")).alias("key_value")
)
Run Code Online (Sandbox Code Playgroud)

这会产生以下错误:

AnalysisException: u"expression '`key`' is neither present in the group by, nor is it an aggregate function....
Run Code Online (Sandbox Code Playgroud)

小智 8

agg组件必须包含实际的聚合函数。解决这个问题的一种方法是结合collect_list

聚合函数:返回具有重复项的对象列表。

struct

创建一个新的结构列。

map_from_entries

集合函数:返回从给定条目数组创建的映射。

这就是你要做的:

toy_data.groupBy("id").agg(
    F.map_from_entries(
        F.collect_list(
            F.struct("key", "value"))).alias("key_value")
).show(truncate=False)
Run Code Online (Sandbox Code Playgroud)
+---+------------------------------+
|id |key_value                     |
+---+------------------------------+
|1  |[a -> 123, b -> 234, c -> 345]|
|2  |[a -> 12, x -> 23, y -> 123]  |
+---+------------------------------+
Run Code Online (Sandbox Code Playgroud)

  • 看起来我的 Spark 版本没有“map_from_entries”。运行上面的代码时,它给我这个错误消息: ```AttributeError: 'module' object has no attribute 'map_from_entries'``` 还有其他我可以尝试的方法吗? (2认同)