我注意到当我在这里运行与我的示例相同的代码但使用unionorunionByName或unionAll代替 时join,我的查询计划花费的时间要长得多,并且可能导致驱动程序 OOM。
此处包含的代码仅供参考,与for()循环内部发生的情况略有不同。
from pyspark.sql import types as T, functions as F, SparkSession
spark = SparkSession.builder.getOrCreate()
schema = T.StructType([
T.StructField("col_1", T.IntegerType(), False),
T.StructField("col_2", T.IntegerType(), False),
T.StructField("measure_1", T.FloatType(), False),
T.StructField("measure_2", T.FloatType(), False),
])
data = [
{"col_1": 1, "col_2": 2, "measure_1": 0.5, "measure_2": 1.5},
{"col_1": 2, "col_2": 3, "measure_1": 2.5, "measure_2": 3.5}
]
df = spark.createDataFrame(data, schema)
right_schema = T.StructType([
T.StructField("col_1", T.IntegerType(), False)
])
right_data = …Run Code Online (Sandbox Code Playgroud) pyspark palantir-foundry foundry-code-repositories foundry-python-transform
我有一个数据源,每天都会提供一个大的 .txt 文件(50-75GB)。该文件包含多个不同的模式,其中每一行对应一个模式。我想将其拆分为每个模式的分区数据集,如何有效地做到这一点?
我有一组.xml想要解析的文档。
我以前曾尝试使用获取文件内容并将它们转储到单个单元格中的方法来解析它们,但是我注意到这在实践中不起作用,因为我看到运行时间越来越慢,通常只有一项任务需要执行运行数十小时:
我的第一个转换获取.xml内容并将其放入单个单元格中,第二个转换获取该字符串并使用 Python 的xml库将该字符串解析为文档。然后我可以从该文档中提取属性并返回 DataFrame。
我正在使用UDF来执行将字符串内容映射到我想要的字段的过程。
我怎样才能让这个更快/更好地处理大.xml文件?
pyspark palantir-foundry foundry-code-repositories foundry-python-transform
我的 Foundry 实例中有一个管道设置,它使用增量计算,但由于某种原因没有达到我的预期。也就是说,我想读取转换的先前输出并获取日期的最大值,然后仅读取紧随该最大日期之后的数据的输入。
由于某种原因,它没有达到我的预期,并且在构建/分析/修改代码过程中逐步执行代码非常令人沮丧。
我的代码如下所示:
from pypsark.sql import functions as F, types as T, DataFrame
from transforms.api import transform, Input, Output, incremental
from datetime import date, timedelta
JUMP_DAYS = 1
START_DATE = date(year=2021, month=10, day=1)
OUTPUT_SCHEMA = T.StructType([
T.StructField("date", T.DateType()),
T.StructField("value", T.IntegerType())
])
@incremental(semantic_version=1)
@transform(
my_input=Input("/path/to/my/input"),
my_output=Output("/path/to/my/output")
)
def only_write_one_day(my_input, my_output):
"""Filter the input to only rows that are a day after the last written output and process them"""
# Get the previous output and full current input
previous_output_df = …Run Code Online (Sandbox Code Playgroud) palantir-foundry foundry-code-repositories foundry-python-transform
我有一个包含数百个测试的存储库,到目前为止已经足够快了,但是随着我们继续扩大代码库,我担心它会变得如此缓慢,以至于我的团队将陷入等待 CI 运行完成的困境。
我可以做些什么来加快速度并使我的测试在短期和长期内都更快?
我需要考虑:
python performance continuous-integration pytest github-actions
我的加入执行如下:
SELECT
left.*,
right.*
FROM `/foo/bar/baz` AS left
JOIN `/foo2/bar2/baz2` AS right
ON left.something = right.something
Run Code Online (Sandbox Code Playgroud)
数据集:
/foo/bar/baz
+-----------+-------+
| something | val_1 |
+-----------+-------+
| a | 1 |
| a | 2 |
| a | 3 |
| a | 4 |
| a | 5 |
| a | 6 |
| a | ... |
| a | 10K |
| b | 1 |
| b | 2 |
| b | 3 | …Run Code Online (Sandbox Code Playgroud) 我注意到我的代码存储库警告我在 for/while 循环中使用 withColumn 是一种反模式。为什么不推荐这样做?这不是PySpark API的正常使用吗?
pyspark palantir-foundry foundry-code-repositories foundry-python-transform