van*_*ser 5 pyspark palantir-foundry foundry-code-repositories foundry-python-transform
我有一组.xml想要解析的文档。
我以前曾尝试使用获取文件内容并将它们转储到单个单元格中的方法来解析它们,但是我注意到这在实践中不起作用,因为我看到运行时间越来越慢,通常只有一项任务需要执行运行数十小时:
我的第一个转换获取.xml内容并将其放入单个单元格中,第二个转换获取该字符串并使用 Python 的xml库将该字符串解析为文档。然后我可以从该文档中提取属性并返回 DataFrame。
我正在使用UDF来执行将字符串内容映射到我想要的字段的过程。
我怎样才能让这个更快/更好地处理大.xml文件?
van*_*ser 10
对于这个问题,我们将结合几种不同的技术来使该代码既可测试又具有高度可扩展性。
\n解析原始文件时,您可以考虑以下几个选项:
\n在我们的例子中,我们可以使用 Databricks 解析器来达到很好的效果。
\n一般来说,您还应该避免使用该.udf方法,因为它可能正在使用,而不是 Spark API 中已有的良好功能。UDF 的性能不如本机方法,仅应在没有其他选项可用时使用。
UDF 掩盖隐藏问题的一个很好的例子是列内容的字符串操作;虽然从技术上讲您可以使用 UDF 来执行诸如拆分和修剪字符串之类的操作,但这些操作已经存在于Spark API中,并且比您自己的代码快几个数量级。
\n我们的设计将使用以下内容:
\n首先,我们需要将其添加.jar到spark_session可用的内部变换中。.jar由于最近的改进,此参数在配置后将允许您在预览/测试和完整构建时使用。以前,这需要完整的构建,但现在不需要了。
我们需要转到我们的transforms-python/build.gradle文件并添加 2 个配置块:
pytest插件condaJars参数并声明.jar依赖关系我/transforms-python/build.gradle现在的样子如下:
buildscript {\n repositories {\n // some other things\n }\n\n dependencies {\n classpath "com.palantir.transforms.python:lang-python-gradle-plugin:${transformsLangPythonPluginVersion}"\n }\n}\n\napply plugin: \'com.palantir.transforms.lang.python\'\napply plugin: \'com.palantir.transforms.lang.python-defaults\'\n\ndependencies {\n condaJars "com.databricks:spark-xml_2.13:0.14.0"\n}\n\n// Apply the testing plugin\napply plugin: \'com.palantir.transforms.lang.pytest-defaults\'\n\n// ... some other awesome features you should enable\nRun Code Online (Sandbox Code Playgroud)\n应用此配置后,您需要通过单击底部功能区并点击来重新启动代码辅助会话Refresh
刷新代码辅助后,我们现在可以使用低级功能来解析我们的.xml文件,现在我们需要测试它!
如果我们采用与这里相同的测试驱动开发风格,我们最终会得到/transforms-python/src/myproject/datasets/xml_parse_transform.py以下内容:
buildscript {\n repositories {\n // some other things\n }\n\n dependencies {\n classpath "com.palantir.transforms.python:lang-python-gradle-plugin:${transformsLangPythonPluginVersion}"\n }\n}\n\napply plugin: \'com.palantir.transforms.lang.python\'\napply plugin: \'com.palantir.transforms.lang.python-defaults\'\n\ndependencies {\n condaJars "com.databricks:spark-xml_2.13:0.14.0"\n}\n\n// Apply the testing plugin\napply plugin: \'com.palantir.transforms.lang.pytest-defaults\'\n\n// ... some other awesome features you should enable\nRun Code Online (Sandbox Code Playgroud)\n/transforms-python/test/myproject/datasets/sample.xml...内容示例文件:
from transforms.api import transform, Output, Input\nfrom transforms.verbs.dataframes import union_many\n\n\ndef read_files(spark_session, paths):\n parsed_dfs = []\n for file_name in paths:\n parsed_df = spark_session.read.format(\'xml\').options(rowTag="tag").load(file_name)\n parsed_dfs += [parsed_df]\n output_df = union_many(*parsed_dfs, how="wide")\n return output_df\n\n\n@transform(\n the_output=Output("my.awesome.output"),\n the_input=Input("my.awesome.input"),\n)\ndef my_compute_function(the_input, the_output, ctx):\n session = ctx.spark_session\n input_filesystem = the_input.filesystem()\n hadoop_path = input_filesystem.hadoop_path\n files = [hadoop_path + "/" + file_name.path for file_name in input_filesystem.ls()]\n output_df = read_files(session, files)\n the_output.write_dataframe(output_df)\nRun Code Online (Sandbox Code Playgroud)\n和一个测试文件/transforms-python/test/myproject/datasets/test_xml_parse_transform.py:
<tag>\n<field1>\nmy_value\n</field1>\n</tag>\nRun Code Online (Sandbox Code Playgroud)\n我们现在有:
\n.xml高度可扩展的分布式计算、低级解析器干杯
\n| 归档时间: |
|
| 查看次数: |
1360 次 |
| 最近记录: |