有人可以解释一下为什么我们需要分别使用transform
&transform_df
方法吗?
我注意到当我在这里运行与我的示例相同的代码但使用union
orunionByName
或unionAll
代替 时join
,我的查询计划花费的时间要长得多,并且可能导致驱动程序 OOM。
此处包含的代码仅供参考,与for()
循环内部发生的情况略有不同。
from pyspark.sql import types as T, functions as F, SparkSession
spark = SparkSession.builder.getOrCreate()
schema = T.StructType([
T.StructField("col_1", T.IntegerType(), False),
T.StructField("col_2", T.IntegerType(), False),
T.StructField("measure_1", T.FloatType(), False),
T.StructField("measure_2", T.FloatType(), False),
])
data = [
{"col_1": 1, "col_2": 2, "measure_1": 0.5, "measure_2": 1.5},
{"col_1": 2, "col_2": 3, "measure_1": 2.5, "measure_2": 3.5}
]
df = spark.createDataFrame(data, schema)
right_schema = T.StructType([
T.StructField("col_1", T.IntegerType(), False)
])
right_data = …
Run Code Online (Sandbox Code Playgroud) pyspark palantir-foundry foundry-code-repositories foundry-python-transform
我在代码仓库中看到了 ctx 的使用,这到底是什么?它是内置库吗?我什么时候会使用它?
我在以下示例中看到过它:
df = ctx.spark.createdataframe(...
Run Code Online (Sandbox Code Playgroud) 使用 Python 转换等方式从代码存储库访问内部 Foundry API 的正确方法是什么?
为了概括 python 函数,我想向 python 库添加函数,以便我可以在多个存储库中使用这些函数。任何人请回答以下问题。
1) 如何创建我们自己的 python 库 2) 如何跨多个存储库导入这些库
conda pyspark palantir-foundry foundry-code-repositories foundry-python-transform
我有一个数据源,每天都会提供一个大的 .txt 文件(50-75GB)。该文件包含多个不同的模式,其中每一行对应一个模式。我想将其拆分为每个模式的分区数据集,如何有效地做到这一点?
我已经通过数据连接将一个包含 100,000 个大约 100GB 的原始 json 文件的数据集导入到代工厂中。我想使用Python Transforms raw file access
转换来读取文件,将结构和结构的数组展平到数据帧中,作为对 df 的增量更新。我想使用来自 *.json 文件的文档中的以下示例中的内容,并将其转换为使用@incremental()
装饰器更新的增量。
>>> import csv
>>> from pyspark.sql import Row
>>> from transforms.api import transform, Input, Output
>>>
>>> @transform(
... processed=Output('/examples/hair_eye_color_processed'),
... hair_eye_color=Input('/examples/students_hair_eye_color_csv'),
... )
... def example_computation(hair_eye_color, processed):
...
... def process_file(file_status):
... with hair_eye_color.filesystem().open(file_status.path) as f:
... r = csv.reader(f)
...
... # Construct a pyspark.Row from our header row
... header = next(r)
... MyRow = Row(*header)
...
... …
Run Code Online (Sandbox Code Playgroud) pyspark palantir-foundry foundry-code-repositories foundry-code-workbooks
我想在 Palantir Foundry 中合并多个数据集,数据集的名称是动态的,因此我无法静态给出数据集名称transform_df()
。有没有一种方法可以动态地将多个输入放入transform_df
并合并所有这些数据帧?
我尝试循环数据集,例如:
li = ['dataset1_path', 'dataset2_path']
union_df = None
for p in li:
@transforms_df(
my_input = Input(p),
Output(p+"_output")
)
def my_compute_function(my_input):
return my_input
if union_df is None:
union_df = my_compute_function
else:
union_df = union_df.union(my_compute_function)
Run Code Online (Sandbox Code Playgroud)
但是,这不会生成联合输出。
union dynamic pyspark palantir-foundry foundry-code-repositories
我有一组.xml
想要解析的文档。
我以前曾尝试使用获取文件内容并将它们转储到单个单元格中的方法来解析它们,但是我注意到这在实践中不起作用,因为我看到运行时间越来越慢,通常只有一项任务需要执行运行数十小时:
我的第一个转换获取.xml
内容并将其放入单个单元格中,第二个转换获取该字符串并使用 Python 的xml库将该字符串解析为文档。然后我可以从该文档中提取属性并返回 DataFrame。
我正在使用UDF来执行将字符串内容映射到我想要的字段的过程。
我怎样才能让这个更快/更好地处理大.xml
文件?
pyspark palantir-foundry foundry-code-repositories foundry-python-transform
在现有代码工作簿中使用“导出到代码存储库助手”工具时,修改下游依赖项以指向新创建的代码存储库数据集的最有效方法是什么?
我们想要修改所有下游依赖项,而不是子集。
palantir-foundry foundry-code-repositories foundry-code-workbooks