使用 map 迭代 PySpark 中的数组列

Mat*_*rio 6 python apache-spark pyspark

在 PySpark 中,我有一个由两列组成的数据框:

+-----------+----------------------+
| str1      | array_of_str         |
+-----------+----------------------+
| John      | [mango, apple, ...   |
| Tom       | [mango, orange, ...  |
| Matteo    | [apple, banana, ...  | 
Run Code Online (Sandbox Code Playgroud)

我想添加一个列concat_result,其中包含内部每个元素与列内字符串的串联array_of_strstr1

+-----------+----------------------+----------------------------------+
| str1      | array_of_str         | concat_result                    |
+-----------+----------------------+----------------------------------+
| John      | [mango, apple, ...   | [mangoJohn, appleJohn, ...       |
| Tom       | [mango, orange, ...  | [mangoTom, orangeTom, ...        |
| Matteo    | [apple, banana, ...  | [appleMatteo, bananaMatteo, ...  |
Run Code Online (Sandbox Code Playgroud)

我试图用它map来迭代数组:

+-----------+----------------------+
| str1      | array_of_str         |
+-----------+----------------------+
| John      | [mango, apple, ...   |
| Tom       | [mango, orange, ...  |
| Matteo    | [apple, banana, ...  | 
Run Code Online (Sandbox Code Playgroud)

但我得到错误:

TypeError: argument 2 to map() must support iteration
Run Code Online (Sandbox Code Playgroud)

Ric*_*eth 6

您只需要进行一些小的调整即可使其工作:

from pyspark.sql.types import StringType, ArrayType
from pyspark.sql.functions import udf, col

concat_udf = udf(lambda con_str, arr: [x + con_str for x in arr],
                   ArrayType(StringType()))
ret = df \
  .select(['str1', 'array_of_str']) \
  .withColumn('concat_result', concat_udf(col("str1"), col("array_of_str")))

ret.show()
Run Code Online (Sandbox Code Playgroud)

您不需要使用map,标准列表理解就足够了。

  • 唯一需要注意的是,如果任何“str1”或“array_of_str”值为“null”,这将会中断。您必须在“udf”中添加显式错误检查。 (2认同)