有人可以解释一下为什么我们需要分别使用transform
&transform_df
方法吗?
我注意到当我在这里运行与我的示例相同的代码但使用union
orunionByName
或unionAll
代替 时join
,我的查询计划花费的时间要长得多,并且可能导致驱动程序 OOM。
此处包含的代码仅供参考,与for()
循环内部发生的情况略有不同。
from pyspark.sql import types as T, functions as F, SparkSession
spark = SparkSession.builder.getOrCreate()
schema = T.StructType([
T.StructField("col_1", T.IntegerType(), False),
T.StructField("col_2", T.IntegerType(), False),
T.StructField("measure_1", T.FloatType(), False),
T.StructField("measure_2", T.FloatType(), False),
])
data = [
{"col_1": 1, "col_2": 2, "measure_1": 0.5, "measure_2": 1.5},
{"col_1": 2, "col_2": 3, "measure_1": 2.5, "measure_2": 3.5}
]
df = spark.createDataFrame(data, schema)
right_schema = T.StructType([
T.StructField("col_1", T.IntegerType(), False)
])
right_data = …
Run Code Online (Sandbox Code Playgroud) pyspark palantir-foundry foundry-code-repositories foundry-python-transform
我在代码仓库中看到了 ctx 的使用,这到底是什么?它是内置库吗?我什么时候会使用它?
我在以下示例中看到过它:
df = ctx.spark.createdataframe(...
Run Code Online (Sandbox Code Playgroud) 使用 Python 转换等方式从代码存储库访问内部 Foundry API 的正确方法是什么?
为了概括 python 函数,我想向 python 库添加函数,以便我可以在多个存储库中使用这些函数。任何人请回答以下问题。
1) 如何创建我们自己的 python 库 2) 如何跨多个存储库导入这些库
conda pyspark palantir-foundry foundry-code-repositories foundry-python-transform
如果我将 Magritte 摄取设置为追加,它会检测源数据中是否删除了行吗?它还会删除摄取数据集中的行吗?
我想在 Palantir Foundry 中合并多个数据集,数据集的名称是动态的,因此我无法静态给出数据集名称transform_df()
。有没有一种方法可以动态地将多个输入放入transform_df
并合并所有这些数据帧?
我尝试循环数据集,例如:
li = ['dataset1_path', 'dataset2_path']
union_df = None
for p in li:
@transforms_df(
my_input = Input(p),
Output(p+"_output")
)
def my_compute_function(my_input):
return my_input
if union_df is None:
union_df = my_compute_function
else:
union_df = union_df.union(my_compute_function)
Run Code Online (Sandbox Code Playgroud)
但是,这不会生成联合输出。
union dynamic pyspark palantir-foundry foundry-code-repositories
我正在尝试在 Palantir Foundry 上的 Slate 中显示 PDF 文件。我设法显示存储在 Foundry 上的文件夹中的没有架构的 PDF 文件,但不能显示数据集中的 PDF。
有没有办法显示存储在数据集中的 PDF 文件,或者如何将使用代码存储库从电子邮件文件中提取的 PDF 文件存储到 Foundry 上的文件夹中。
编辑:由于显示存储在数据集中的 PDF 文件似乎很困难。有人可以帮助我调用 API 将 PDF 存储在文件夹中吗?
在现有代码工作簿中使用“导出到代码存储库助手”工具时,修改下游依赖项以指向新创建的代码存储库数据集的最有效方法是什么?
我们想要修改所有下游依赖项,而不是子集。
palantir-foundry foundry-code-repositories foundry-code-workbooks
groupby
在我的主要转换中,我通过执行 a然后applyInPandas
在 Foundry 中运行算法。构建需要很长时间,一种想法是使用哈希分区/分桶来组织文件以防止随机读取和排序。
对于 mcve,我有以下数据集:
def example_df():
return spark.createDataFrame(
[("1","2", 1.0), ("1","3", 2.0), ("2","4", 3.0), ("2","5", 5.0), ("2","2", 10.0)],
("id_1","id_2", "v"))
Run Code Online (Sandbox Code Playgroud)
我想要应用的变换是:
def df1(example_df):
def subtract_mean(pdf):
v = pdf.v
return pdf.assign(v=v - v.mean())
return example_df.groupby("id_1","id_2").applyInPandas(subtract_mean, schema="id_1 string, id_2 string, v double")
Run Code Online (Sandbox Code Playgroud)
当我查看没有分区的原始查询计划时,它看起来如下所示:
物理计划:
Execute FoundrySaveDatasetCommand `ri.foundry.main.transaction.00000059-eb1b-61f4-bdb8-a030ac6baf0a@master`.`ri.foundry.main.dataset.eb664037-fcae-4ce2-b92b-bd103cd504b3`, ErrorIfExists, [id_1, id_2, v], ComputedStatsServiceV2Blocking{_endpointChannelFactory=DialogueChannel@3127a629{channelName=dialogue-nonreloading-ComputedStatsServiceV2Blocking, delegate=com.palantir.dialogue.core.DialogueChannel$Builder$$Lambda$713/0x0000000800807c40@70f51090}, runtime=com.palantir.conjure.java.dialogue.serde.DefaultConjureRuntime@6c67a62a}, com.palantir.foundry.spark.catalog.caching.CachingSchemaService@7d881feb, com.palantir.foundry.spark.catalog.caching.CachingMetadataService@57a1ef9e, com.palantir.foundry.spark.catalog.FoundrySparkResolver@4d38f6f5, com.palantir.foundry.spark.auth.DefaultFoundrySparkAuthSupplier@21103ab4
+- AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
*(3) BasicStats `ri.foundry.main.transaction.00000059-eb1b-61f4-bdb8-a030ac6baf0a@master`.`ri.foundry.main.dataset.eb664037-fcae-4ce2-b92b-bd103cd504b3`
+- FlatMapGroupsInPandas [id_1#487, id_2#488], subtract_mean(id_1#487, id_2#488, v#489), [id_1#497, …
Run Code Online (Sandbox Code Playgroud) partitioning apache-spark pyspark palantir-foundry foundry-code-repositories