DGr*_*ady 1 apache-spark apache-spark-sql pyspark
如何有效地在Spark DataFrame中将列除以其自身的总和,而又不立即触发计算?
假设我们有一些数据:
import pyspark
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as spf
spark = SparkSession.builder.master('local').getOrCreate()
data = spark.range(0, 100)
data # --> DataFrame[id: bigint]
Run Code Online (Sandbox Code Playgroud)
我想在此数据框架上创建一个包含“”的新列id / sum(id)。一种方法是预先计算总和,如下所示:
s = data.select(spf.sum('id')).collect()[0][0]
data2 = data.withColumn('normalized', spf.col('id') / s)
data2 # --> DataFrame[id: bigint, normalized: double]
Run Code Online (Sandbox Code Playgroud)
效果很好,但立即触发了计算;如果为许多列定义类似的内容,将导致对数据进行多次冗余传递。
另一种方法是使用包含整个表的窗口规范:
w = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
data3 = data.withColumn('normalized', spf.col('id') / spf.sum('id').over(w))
data3 # --> DataFrame[id: bigint, normalized: double]
Run Code Online (Sandbox Code Playgroud)
在这种情况下,可以定义data3,但是一旦尝试实际计算,Spark 2.2.0会将所有数据移动到单个分区中,这通常会导致大型数据集作业失败。
还有什么其他方法可以解决此问题,而不会触发立即计算并且可以处理大型数据集?我对任何解决方案都感兴趣,不一定是基于的解决方案pyspark。
crossJoin 聚合是一种方法:
data.crossJoin(
data.select(spf.sum('id').alias("sum_id"))
).withColumn("normalized", spf.col("id") / spf.col("sum_id"))
Run Code Online (Sandbox Code Playgroud)
但我不用担心:
效果很好,但立即触发了计算;如果为许多列定义类似的内容,将导致对数据进行多次冗余传递。
只需一次计算多个统计信息:
data2 = data.select(spf.rand(42).alias("x"), spf.randn(42).alias("y"))
mean_x, mean_y = data2.groupBy().mean().first()
Run Code Online (Sandbox Code Playgroud)
其余的只是对本地表达式的操作:
data2.select(spf.col("x") - mean_x, spf.col("y") - mean_y)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3906 次 |
| 最近记录: |