如何在 Palantir Foundry 中解析 xml 文档?

van*_*ser 5 pyspark palantir-foundry foundry-code-repositories foundry-python-transform

我有一组.xml想要解析的文档。

我以前曾尝试使用获取文件内容并将它们转储到单个单元格中的方法来解析它们,但是我注意到这在实践中不起作用,因为我看到运行时间越来越慢,通常只有一项任务需要执行运行数十小时:

我的第一个转换获取.xml内容并将其放入单个单元格中,第二个转换获取该字符串并使用 Python 的xml库将该字符串解析为文档。然后我可以从该文档中提取属性并返回 DataFrame。

我正在使用UDF来执行将字符串内容映射到我想要的字段的过程。

我怎样才能让这个更快/更好地处理大.xml文件?

van*_*ser 10

对于这个问题,我们将结合几种不同的技术来使该代码既可测试又具有高度可扩展性。

\n

理论

\n

解析原始文件时,您可以考虑以下几个选项:

\n
    \n
  1. \xe2\x9d\x8c 您可以编写自己的解析器来从文件中读取字节并将其转换为 Spark 可以理解的数据。\n
      \n
    • 由于工程时间和不可扩展的架构,尽可能不鼓励这样做。当您执行此操作时,它不会利用分布式计算,因为您必须将整个原始文件引入解析方法才能使用它。这不是对您资源的有效利用。
    • \n
    \n
  2. \n
  3. \xe2\x9a\xa0 您可以使用自己的非 Spark 解析器库,例如问题中提到的 XML Python 库\n
      \n
    • 虽然这比编写自己的解析器更容易实现,但它仍然没有利用 Spark 中的分布式计算。让某些东西运行起来更容易,但最终会达到性能极限,因为它没有利用仅在编写 Spark 库时公开的低级 Spark 功能。
    • \n
    \n
  4. \n
  5. \xe2\x9c\x85 您可以使用 Spark 本机原始文件解析器\n
      \n
    • 在所有情况下,这都是首选选项,因为它利用低级 Spark 功能,并且不需要您编写自己的代码。如果存在低级 Spark 解析器,则应该使用它。
    • \n
    \n
  6. \n
\n

在我们的例子中,我们可以使用 Databricks 解析器来达到很好的效果。

\n

一般来说,您还应该避免使用该.udf方法,因为它可能正在使用,而不是 Spark API 中已有的良好功能。UDF 的性能不如本机方法,仅应在没有其他选项可用时使用。

\n

UDF 掩盖隐藏问题的一个很好的例子是列内容的字符串操作;虽然从技术上讲您可以使用 UDF 来执行诸如拆分和修剪字符串之类的操作,但这些操作已经存在于Spark API中,并且比您自己的代码快几个数量级。

\n

设计

\n

我们的设计将使用以下内容:

\n
    \n
  1. 通过Databricks XML 解析器完成低级 Spark 优化文件解析
  2. \n
  3. 测试驱动的原始文件解析,如此处所述
  4. \n
\n

连接解析器

\n

首先,我们需要将其添加.jarspark_session可用的内部变换中。.jar由于最近的改进,此参数在配置后将允许您在预览/测试和完整构建时使用。以前,这需要完整的构建,但现在不需要了。

\n

我们需要转到我们的transforms-python/build.gradle文件并添加 2 个配置块:

\n
    \n
  1. 启用pytest插件
  2. \n
  3. 启用condaJars参数并声明.jar依赖关系
  4. \n
\n

/transforms-python/build.gradle现在的样子如下:

\n
buildscript {\n    repositories {\n       // some other things\n    }\n\n    dependencies {\n        classpath "com.palantir.transforms.python:lang-python-gradle-plugin:${transformsLangPythonPluginVersion}"\n    }\n}\n\napply plugin: \'com.palantir.transforms.lang.python\'\napply plugin: \'com.palantir.transforms.lang.python-defaults\'\n\ndependencies {\n    condaJars "com.databricks:spark-xml_2.13:0.14.0"\n}\n\n// Apply the testing plugin\napply plugin: \'com.palantir.transforms.lang.pytest-defaults\'\n\n// ... some other awesome features you should enable\n
Run Code Online (Sandbox Code Playgroud)\n

应用此配置后,您需要通过单击底部功能区并点击来重新启动代码辅助会话Refresh

\n

刷新

\n

刷新代码辅助后,我们现在可以使用低级功能来解析我们的.xml文件,现在我们需要测试它!

\n

测试解析器

\n

如果我们采用与这里相同的测试驱动开发风格,我们最终会得到/transforms-python/src/myproject/datasets/xml_parse_transform.py以下内容:

\n
buildscript {\n    repositories {\n       // some other things\n    }\n\n    dependencies {\n        classpath "com.palantir.transforms.python:lang-python-gradle-plugin:${transformsLangPythonPluginVersion}"\n    }\n}\n\napply plugin: \'com.palantir.transforms.lang.python\'\napply plugin: \'com.palantir.transforms.lang.python-defaults\'\n\ndependencies {\n    condaJars "com.databricks:spark-xml_2.13:0.14.0"\n}\n\n// Apply the testing plugin\napply plugin: \'com.palantir.transforms.lang.pytest-defaults\'\n\n// ... some other awesome features you should enable\n
Run Code Online (Sandbox Code Playgroud)\n

/transforms-python/test/myproject/datasets/sample.xml...内容示例文件:

\n
from transforms.api import transform, Output, Input\nfrom transforms.verbs.dataframes import union_many\n\n\ndef read_files(spark_session, paths):\n    parsed_dfs = []\n    for file_name in paths:\n        parsed_df = spark_session.read.format(\'xml\').options(rowTag="tag").load(file_name)\n        parsed_dfs += [parsed_df]\n    output_df = union_many(*parsed_dfs, how="wide")\n    return output_df\n\n\n@transform(\n    the_output=Output("my.awesome.output"),\n    the_input=Input("my.awesome.input"),\n)\ndef my_compute_function(the_input, the_output, ctx):\n    session = ctx.spark_session\n    input_filesystem = the_input.filesystem()\n    hadoop_path = input_filesystem.hadoop_path\n    files = [hadoop_path + "/" + file_name.path for file_name in input_filesystem.ls()]\n    output_df = read_files(session, files)\n    the_output.write_dataframe(output_df)\n
Run Code Online (Sandbox Code Playgroud)\n

和一个测试文件/transforms-python/test/myproject/datasets/test_xml_parse_transform.py

\n
<tag>\n<field1>\nmy_value\n</field1>\n</tag>\n
Run Code Online (Sandbox Code Playgroud)\n

我们现在有:

\n
    \n
  1. .xml高度可扩展的分布式计算、低级解析器
  2. \n
  3. 测试驱动的设置,我们可以快速迭代以获得正确的功能
  4. \n
\n

干杯

\n