如何编写一个返回字典类型的 python 函数 udf

NG_*_*_21 5 python user-defined-functions apache-spark apache-spark-sql pyspark

我正在与 pyspark 合作。我有一个火花数据框,其格式如下

| person_id | person_attributes
____________________________________________________________________________
| id_1    "department=Sales__title=Sales_executive__level=junior"
| id_2    "department=Engineering__title=Software Engineer__level=entry-level" 
Run Code Online (Sandbox Code Playgroud)

我编写了一个 python 函数,它接受 person_id 和 person_attributes 并返回以下格式的 json {"id_1":{"properties":[{"department":'Sales'},{"title":'Sales_executive'},{}]}}

但我不知道如何将其注册为具有正确输出类型的udfin pyspark。这是Python代码

def create_json_from_string(pid,attribute_string):
    results = []
    attribute_map ={}
    output = {}

    # Split the attribute_string into key,value pair and store it in attribute map
    if attribute_string != '':
        attribute_string = attribute_string.split("__") # This will be a list 
        for substring in attribute_string:
            k,v = substring.split("=")
            attribute_map[str(k)] = str(v)

    for k,v in attribute_map.items():
        temp = {k:v}
        results.append(temp)

    output ={pid : {"properties": results }}
    return(output)
Run Code Online (Sandbox Code Playgroud)

Ale*_*Ott 4

您需要修改函数以仅返回字符串的映射,而不是形成完整的结构。之后,函数可以应用于单个列,而不是整行。像这样的东西:

from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import col

def struct_from_string(attribute_string):
    attribute_map ={}
    if attribute_string != '':
        attribute_string = attribute_string.split("__") # This will be a list 
        for substring in attribute_string:
            k,v = substring.split("=")
            attribute_map[str(k)] = str(v)
    return attribute_map

my_parse_string_udf = spark.udf.register("my_parse_string", struct_from_string, 
     MapType(StringType(), StringType()))
Run Code Online (Sandbox Code Playgroud)

然后可以如下使用:

df2 = df.select(col("person_id"), my_parse_string_udf(col("person_attributes")))
Run Code Online (Sandbox Code Playgroud)