计算每行并在 DataFrame PySpark 中添加新列 - 更好的解决方案?

fee*_*hka 3 dataframe apache-spark rdd apache-spark-sql pyspark

我在 PySpark 中使用数据框,我有以下任务:检查所有列中每列中有多少“次”值 > 2。对于 u1 来说它是 0,对于 u2 => 2 等等

user    a   b   c   d   times
   u1   1   0   1   0   0
   u2   0   1   4   3   2
   u3   2   1   7   0   1
Run Code Online (Sandbox Code Playgroud)

我的解决方案如下。它有效,我不确定这是最好的方法,并且还没有尝试过真正的大数据。我不喜欢转换为 rdd 并返回数据框。有更好的吗?我一开始想按每列的 UDF 进行计算,但没有找到一种方法来累加和总结每行的所有结果:

user    a   b   c   d   times
   u1   1   0   1   0   0
   u2   0   1   4   3   2
   u3   2   1   7   0   1
Run Code Online (Sandbox Code Playgroud)

对于这个解决方案,我使用了这个主题 如何将 numpy.array 作为新列添加到 pyspark.SQL DataFrame?

谢谢!

zer*_*323 6

这只是简单的一句台词。示例数据:

df = sc.parallelize([
    ("u1", 1, 0, 1, 0), ("u2", 0, 1, 4, 3), ("u3", 2, 1, 7, 0)
]).toDF(["user", "a", "b", "c", "d"])
Run Code Online (Sandbox Code Playgroud)

withColumn

df.withColumn("times", sum((df[c] > 2).cast("int") for c in df.columns[1:]))
Run Code Online (Sandbox Code Playgroud)

结果:

+----+---+---+---+---+-----+
|user|  a|  b|  c|  d|times|
+----+---+---+---+---+-----+
|  u1|  1|  0|  1|  0|    0|
|  u2|  0|  1|  4|  3|    2|
|  u3|  2|  1|  7|  0|    1|
+----+---+---+---+---+-----+
Run Code Online (Sandbox Code Playgroud)

笔记:

您应该更正它的列nullable,例如使用coalesce

from pyspark.sql.functions import coalesce

sum(coalesce((df[c] > 2).cast("int"), 0) for c in df.columns[1:])
Run Code Online (Sandbox Code Playgroud)