使用 UDF 从 PySpark Dataframe 解析 XML 列

bab*_*434 6 xml user-defined-functions apache-spark-sql pyspark

我有一个场景,数据框列中有 XML 数据。

+-----------+----------+----------+--------------------+----+----------+--------+---+------------------+----------+--------------------+----+
|     county|created_at|first_name|                  id|meta|name_count|position|sex|               sid|updated_at|            visitors|year|
+-----------+----------+----------+--------------------+----+----------+--------+---+------------------+----------+--------------------+----+
|      KINGS|1574264158|      ZOEY|00000000-0000-000...| { }|        11|       0|  F|row-r9pv-p86t.ifsp|1574264158|<?xml version="1....|2007|
Run Code Online (Sandbox Code Playgroud)

我想Visitors使用 UDF 将嵌套的 XML 字段解析为 Dataframe 中的列

XML 格式 -

<?xml version="1.0" encoding="utf-8"?> <visitors> <visitor id="9615" age="68" sex="F" /> <visitor id="1882" age="34" sex="M" /> <visitor id="5987" age="23" sex="M" /> </visitors>
Run Code Online (Sandbox Code Playgroud)

use*_*471 4

Databricks Spark-xml Github 页面上有一个部分讨论了解析嵌套 xml,它提供了一个使用 Scala API 的解决方案,以及几个 Pyspark 辅助函数来解决没有单独的 Python 包的问题Spark-XML。因此,使用这些,这是解决问题的一种方法:

# 1. Copy helper functions from https://github.com/databricks/spark-xml#pyspark-notes

from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string
import pyspark.sql.functions as F


def ext_from_xml(xml_column, schema, options={}):
    java_column = _to_java_column(xml_column.cast('string'))
    java_schema = spark._jsparkSession.parseDataType(schema.json())
    scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
    jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
        java_column, java_schema, scala_map)
    return Column(jc)

def ext_schema_of_xml_df(df, options={}):
    assert len(df.columns) == 1

    scala_options = spark._jvm.PythonUtils.toScalaMap(options)
    java_xml_module = getattr(getattr(
        spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
    java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
    return _parse_datatype_json_string(java_schema.json())

# 2. Set up example dataframe

xml = '<?xml version="1.0" encoding="utf-8"?> <visitors> <visitor id="9615" age="68" sex="F" /> <visitor id="1882" age="34" sex="M" /> <visitor id="5987" age="23" sex="M" /> </visitors>'

df = spark.createDataFrame([('1',xml)],['id','visitors'])
df.show()

# +---+--------------------+
# | id|            visitors|
# +---+--------------------+
# |  1|<?xml version="1....|
# +---+--------------------+

# 3. Get xml schema and parse xml column

payloadSchema = ext_schema_of_xml_df(df.select("visitors"))
parsed = df.withColumn("parsed", ext_from_xml(F.col("visitors"), payloadSchema))
parsed.show()

# +---+--------------------+--------------------+
# | id|            visitors|              parsed|
# +---+--------------------+--------------------+
# |  1|<?xml version="1....|[[[, 68, 9615, F]...|
# +---+--------------------+--------------------+

# 4. Extract 'visitor' field from StructType
df2 = parsed.select(*parsed.columns[:-1],F.explode(F.col('parsed').getItem('visitor')))
df2.show()

# +---+--------------------+---------------+
# | id|            visitors|            col|
# +---+--------------------+---------------+
# |  1|<?xml version="1....|[, 68, 9615, F]|
# |  1|<?xml version="1....|[, 34, 1882, M]|
# |  1|<?xml version="1....|[, 23, 5987, M]|
# +---+--------------------+---------------+

# 5. Get field names, which will become new columns
# (there's probably a much better way of doing this :D)
new_col_names = [s.split(':')[0] for s in payloadSchema['visitor'].simpleString().split('<')[-1].strip('>>').split(',')]

new_col_names

# ['_VALUE', '_age', '_id', '_sex']

# 6. Create new columns

for c in new_col_names:
    df2 = df2.withColumn(c, F.col('col').getItem(c))
    
df2 = df2.drop('col','_VALUE')

df2.show()

# +---+--------------------+----+----+----+
# | id|            visitors|_age| _id|_sex|
# +---+--------------------+----+----+----+
# |  1|<?xml version="1....|  68|9615|   F|
# |  1|<?xml version="1....|  34|1882|   M|
# |  1|<?xml version="1....|  23|5987|   M|
# +---+--------------------+----+----+----+
Run Code Online (Sandbox Code Playgroud)

需要注意的一件事是新列名与现有列名重复 - 在这种情况下,新列名前面都带有下划线,因此我们没有任何重复,但最好检查一下嵌套的 xml 标记是否不重复。 t 预先与现有的列名称冲突。