NG_*_*_21 5 python user-defined-functions apache-spark apache-spark-sql pyspark
我正在与 pyspark 合作。我有一个火花数据框,其格式如下
| person_id | person_attributes
____________________________________________________________________________
| id_1 "department=Sales__title=Sales_executive__level=junior"
| id_2 "department=Engineering__title=Software Engineer__level=entry-level"
Run Code Online (Sandbox Code Playgroud)
我编写了一个 python 函数,它接受 person_id 和 person_attributes 并返回以下格式的 json
{"id_1":{"properties":[{"department":'Sales'},{"title":'Sales_executive'},{}]}}
但我不知道如何将其注册为具有正确输出类型的udfin pyspark。这是Python代码
def create_json_from_string(pid,attribute_string):
results = []
attribute_map ={}
output = {}
# Split the attribute_string into key,value pair and store it in attribute map
if attribute_string != '':
attribute_string = attribute_string.split("__") # This will be a list
for substring in attribute_string:
k,v = substring.split("=")
attribute_map[str(k)] = str(v)
for k,v in attribute_map.items():
temp = {k:v}
results.append(temp)
output ={pid : {"properties": results }}
return(output)
Run Code Online (Sandbox Code Playgroud)
您需要修改函数以仅返回字符串的映射,而不是形成完整的结构。之后,函数可以应用于单个列,而不是整行。像这样的东西:
from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import col
def struct_from_string(attribute_string):
attribute_map ={}
if attribute_string != '':
attribute_string = attribute_string.split("__") # This will be a list
for substring in attribute_string:
k,v = substring.split("=")
attribute_map[str(k)] = str(v)
return attribute_map
my_parse_string_udf = spark.udf.register("my_parse_string", struct_from_string,
MapType(StringType(), StringType()))
Run Code Online (Sandbox Code Playgroud)
然后可以如下使用:
df2 = df.select(col("person_id"), my_parse_string_udf(col("person_attributes")))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8860 次 |
| 最近记录: |