我想知道如何在任务之间传输数据而不将它们存储在任务之间。附图可以找到任务流程。截至目前,我将每个任务的输出 csv 文件作为文件存储在本地计算机中,并再次获取此 csv 文件作为下一个任务的输入。我想知道是否有其他方法可以在任务之间传递数据而不在每个任务后存储数据。我做了一些研究,发现了Xcoms。我想确定Xcoms是否是实现这一目标的正确方法,还是我错了。我找不到任何实际的例子。任何帮助将不胜感激,因为我只是气流方面的新手,刚开始几天
我有一个火花df
spark_df = spark.createDataFrame(
[(1, 7, 'foo'),
(2, 6, 'bar'),
(3, 4, 'foo'),
(4, 8, 'bar'),
(5, 1, 'bar')
],
['v1', 'v2', 'id']
)
Run Code Online (Sandbox Code Playgroud)
预期输出
id avg(v1) avg(v2) min(v1) min(v2) 0.25(v1) 0.25(v2) 0.5(v1) 0.5(v2)
0 bar 3.666667 5.0 2 1 some-value some-value some-value some-value
1 foo 2.000000 5.5 1 4. some-value some-value some-value some-value
Run Code Online (Sandbox Code Playgroud)
到目前为止,我已经可以实现平均值、最小值、最大值等基本统计数据。但无法获得分位数。我知道,这可以在 Pandas 中轻松实现,但无法在 Pyspark 中完成
另外,我知道 approxQuantile,但我无法将基本统计数据与 pyspark 中的分位数结合起来
到目前为止,我可以使用 agg 获得平均值和最小值等基本统计数据。我也想要相同 df 中的分位数
func = [F.mean, F.min,]
NUMERICAL_FEATURE_LIST = ['v1', 'v2']
GROUP_BY_FIELDS = ['id']
exp …Run Code Online (Sandbox Code Playgroud)