pyspark根据条件从数组结构列中获取元素

Sal*_*dia 6 python dataframe apache-spark apache-spark-sql pyspark

我有一个具有以下架构的 Spark df:

 |-- col1 : string
 |-- col2 : string
 |-- customer: struct
 |    |-- smt: string
 |    |-- attributes: array (nullable = true)
 |    |    |-- element: struct
 |    |    |     |-- key: string
 |    |    |     |-- value: string
Run Code Online (Sandbox Code Playgroud)

df:

#+-------+-------+---------------------------------------------------------------------------+
#|col1   |col2   |customer                                                                   |
#+-------+-------+---------------------------------------------------------------------------+
#|col1_XX|col2_XX|"attributes":[[{"key": "A", "value": "123"},{"key": "B", "value": "456"}]  |
#+-------+-------+---------------------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

数组的 json 输入如下所示:

...
          "attributes": [
            {
              "key": "A",
              "value": "123"
            },
            {
              "key": "B",
              "value": "456"
            }
          ],
Run Code Online (Sandbox Code Playgroud)

我想循环属性数组并获取元素key="B",然后选择相应的value. 我不想使用爆炸,因为我想避免连接数据帧。是否可以直接使用spark 'Column' 执行此类操作?

预期输出将是:

#+-------+-------+-----+
#|col1   |col2   |B    |                                                               |
#+-------+-------+-----+
#|col1_XX|col2_XX|456  |
#+-------+-------+-----+
Run Code Online (Sandbox Code Playgroud)

任何帮助,将不胜感激

bla*_*hop 8

您可以使用filter函数来过滤结构数组,然后获取value

from pyspark.sql import functions as F

df2 = df.withColumn(
    "B", 
    F.expr("filter(customer.attributes, x -> x.key = 'B')")[0]["value"]
)
Run Code Online (Sandbox Code Playgroud)