bab*_*434 6 xml user-defined-functions apache-spark-sql pyspark
我有一个场景,数据框列中有 XML 数据。
+-----------+----------+----------+--------------------+----+----------+--------+---+------------------+----------+--------------------+----+
| county|created_at|first_name| id|meta|name_count|position|sex| sid|updated_at| visitors|year|
+-----------+----------+----------+--------------------+----+----------+--------+---+------------------+----------+--------------------+----+
| KINGS|1574264158| ZOEY|00000000-0000-000...| { }| 11| 0| F|row-r9pv-p86t.ifsp|1574264158|<?xml version="1....|2007|
Run Code Online (Sandbox Code Playgroud)
我想Visitors使用 UDF 将嵌套的 XML 字段解析为 Dataframe 中的列
XML 格式 -
<?xml version="1.0" encoding="utf-8"?> <visitors> <visitor id="9615" age="68" sex="F" /> <visitor id="1882" age="34" sex="M" /> <visitor id="5987" age="23" sex="M" /> </visitors>
Run Code Online (Sandbox Code Playgroud)
Databricks Spark-xml Github 页面上有一个部分讨论了解析嵌套 xml,它提供了一个使用 Scala API 的解决方案,以及几个 Pyspark 辅助函数来解决没有单独的 Python 包的问题Spark-XML。因此,使用这些,这是解决问题的一种方法:
# 1. Copy helper functions from https://github.com/databricks/spark-xml#pyspark-notes
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string
import pyspark.sql.functions as F
def ext_from_xml(xml_column, schema, options={}):
java_column = _to_java_column(xml_column.cast('string'))
java_schema = spark._jsparkSession.parseDataType(schema.json())
scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
java_column, java_schema, scala_map)
return Column(jc)
def ext_schema_of_xml_df(df, options={}):
assert len(df.columns) == 1
scala_options = spark._jvm.PythonUtils.toScalaMap(options)
java_xml_module = getattr(getattr(
spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
return _parse_datatype_json_string(java_schema.json())
# 2. Set up example dataframe
xml = '<?xml version="1.0" encoding="utf-8"?> <visitors> <visitor id="9615" age="68" sex="F" /> <visitor id="1882" age="34" sex="M" /> <visitor id="5987" age="23" sex="M" /> </visitors>'
df = spark.createDataFrame([('1',xml)],['id','visitors'])
df.show()
# +---+--------------------+
# | id| visitors|
# +---+--------------------+
# | 1|<?xml version="1....|
# +---+--------------------+
# 3. Get xml schema and parse xml column
payloadSchema = ext_schema_of_xml_df(df.select("visitors"))
parsed = df.withColumn("parsed", ext_from_xml(F.col("visitors"), payloadSchema))
parsed.show()
# +---+--------------------+--------------------+
# | id| visitors| parsed|
# +---+--------------------+--------------------+
# | 1|<?xml version="1....|[[[, 68, 9615, F]...|
# +---+--------------------+--------------------+
# 4. Extract 'visitor' field from StructType
df2 = parsed.select(*parsed.columns[:-1],F.explode(F.col('parsed').getItem('visitor')))
df2.show()
# +---+--------------------+---------------+
# | id| visitors| col|
# +---+--------------------+---------------+
# | 1|<?xml version="1....|[, 68, 9615, F]|
# | 1|<?xml version="1....|[, 34, 1882, M]|
# | 1|<?xml version="1....|[, 23, 5987, M]|
# +---+--------------------+---------------+
# 5. Get field names, which will become new columns
# (there's probably a much better way of doing this :D)
new_col_names = [s.split(':')[0] for s in payloadSchema['visitor'].simpleString().split('<')[-1].strip('>>').split(',')]
new_col_names
# ['_VALUE', '_age', '_id', '_sex']
# 6. Create new columns
for c in new_col_names:
df2 = df2.withColumn(c, F.col('col').getItem(c))
df2 = df2.drop('col','_VALUE')
df2.show()
# +---+--------------------+----+----+----+
# | id| visitors|_age| _id|_sex|
# +---+--------------------+----+----+----+
# | 1|<?xml version="1....| 68|9615| F|
# | 1|<?xml version="1....| 34|1882| M|
# | 1|<?xml version="1....| 23|5987| M|
# +---+--------------------+----+----+----+
Run Code Online (Sandbox Code Playgroud)
需要注意的一件事是新列名与现有列名重复 - 在这种情况下,新列名前面都带有下划线,因此我们没有任何重复,但最好检查一下嵌套的 xml 标记是否不重复。 t 预先与现有的列名称冲突。
| 归档时间: |
|
| 查看次数: |
9272 次 |
| 最近记录: |