我注意到当我在这里运行与我的示例相同的代码但使用unionorunionByName或unionAll代替 时join,我的查询计划花费的时间要长得多,并且可能导致驱动程序 OOM。
此处包含的代码仅供参考,与for()循环内部发生的情况略有不同。
from pyspark.sql import types as T, functions as F, SparkSession
spark = SparkSession.builder.getOrCreate()
schema = T.StructType([
T.StructField("col_1", T.IntegerType(), False),
T.StructField("col_2", T.IntegerType(), False),
T.StructField("measure_1", T.FloatType(), False),
T.StructField("measure_2", T.FloatType(), False),
])
data = [
{"col_1": 1, "col_2": 2, "measure_1": 0.5, "measure_2": 1.5},
{"col_1": 2, "col_2": 3, "measure_1": 2.5, "measure_2": 3.5}
]
df = spark.createDataFrame(data, schema)
right_schema = T.StructType([
T.StructField("col_1", T.IntegerType(), False)
])
right_data = …Run Code Online (Sandbox Code Playgroud) pyspark palantir-foundry foundry-code-repositories foundry-python-transform
为了概括 python 函数,我想向 python 库添加函数,以便我可以在多个存储库中使用这些函数。任何人请回答以下问题。
1) 如何创建我们自己的 python 库 2) 如何跨多个存储库导入这些库
conda pyspark palantir-foundry foundry-code-repositories foundry-python-transform
我有一组.xml想要解析的文档。
我以前曾尝试使用获取文件内容并将它们转储到单个单元格中的方法来解析它们,但是我注意到这在实践中不起作用,因为我看到运行时间越来越慢,通常只有一项任务需要执行运行数十小时:
我的第一个转换获取.xml内容并将其放入单个单元格中,第二个转换获取该字符串并使用 Python 的xml库将该字符串解析为文档。然后我可以从该文档中提取属性并返回 DataFrame。
我正在使用UDF来执行将字符串内容映射到我想要的字段的过程。
我怎样才能让这个更快/更好地处理大.xml文件?
pyspark palantir-foundry foundry-code-repositories foundry-python-transform
我的 Foundry 实例中有一个管道设置,它使用增量计算,但由于某种原因没有达到我的预期。也就是说,我想读取转换的先前输出并获取日期的最大值,然后仅读取紧随该最大日期之后的数据的输入。
由于某种原因,它没有达到我的预期,并且在构建/分析/修改代码过程中逐步执行代码非常令人沮丧。
我的代码如下所示:
from pypsark.sql import functions as F, types as T, DataFrame
from transforms.api import transform, Input, Output, incremental
from datetime import date, timedelta
JUMP_DAYS = 1
START_DATE = date(year=2021, month=10, day=1)
OUTPUT_SCHEMA = T.StructType([
T.StructField("date", T.DateType()),
T.StructField("value", T.IntegerType())
])
@incremental(semantic_version=1)
@transform(
my_input=Input("/path/to/my/input"),
my_output=Output("/path/to/my/output")
)
def only_write_one_day(my_input, my_output):
"""Filter the input to only rows that are a day after the last written output and process them"""
# Get the previous output and full current input
previous_output_df = …Run Code Online (Sandbox Code Playgroud) palantir-foundry foundry-code-repositories foundry-python-transform
我注意到我的代码存储库警告我在 for/while 循环中使用 withColumn 是一种反模式。为什么不推荐这样做?这不是PySpark API的正常使用吗?
pyspark palantir-foundry foundry-code-repositories foundry-python-transform