在PySpark数据帧中将列和添加为新列

pla*_*lam 25 python apache-spark pyspark spark-dataframe

我正在使用PySpark,我有一个带有一堆数字列的Spark数据帧.我想添加一个列,它是所有其他列的总和.

假设我的数据框有"a","b"和"c"列.我知道我可以这样做:

df.withColumn('total_col', df.a + df.b + df.c)
Run Code Online (Sandbox Code Playgroud)

问题是我不想单独输出每一列并添加它们,特别是如果我有很多列.我希望能够自动执行此操作,或者通过指定要添加的列名列表.还有另一种方法吗?

Pau*_*aul 39

这并不明显.我看不到spark Dataframes API中定义的列的基于行的总和.

版本2

这可以通过一种非常简单的方式完成:

newdf = df.withColumn('total', sum(df[col] for col in df.columns))
Run Code Online (Sandbox Code Playgroud)

df.columns由pyspark提供,作为字符串列表,给出Spark Dataframe中的所有列名.对于不同的总和,您可以提供任何其他列名称列表.

我没有尝试这个作为我的第一个解决方案,因为我不确定它会如何表现.但它的确有效.

版本1

这太复杂了,但也很有效.

你可以这样做:

  1. 用于df.columns获取列名称列表
  2. 使用该名称列表来列出列
  3. 将该列表传递给将以折叠类型函数方式调用列的重载add函数的东西

使用python的reduce,有关运算符重载如何工作的一些知识,以及这里的列的pyspark代码变为:

def column_add(a,b):
     return  a.__add__(b)

newdf = df.withColumn('total_col', 
         reduce(column_add, ( df[col] for col in df.columns ) ))
Run Code Online (Sandbox Code Playgroud)

请注意,这是一个python reduce,而不是一个spark RDD reduce,而第二个要减少的参数中的括号术语需要括号,因为它是一个列表生成器表达式.

经过测试,有效!

$ pyspark
>>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache()
>>> df
DataFrame[a: bigint, b: bigint, c: bigint]
>>> df.columns
['a', 'b', 'c']
>>> def column_add(a,b):
...     return a.__add__(b)
...
>>> df.withColumn('total', reduce(column_add, ( df[col] for col in df.columns ) )).collect()
[Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)]
Run Code Online (Sandbox Code Playgroud)

  • 版本2不适用于Spark 1.5.0和CDH-5.5.2.和Python版本3.4.它抛出一个错误:"AttributeError:'generator'对象没有属性'_get_object_id" (4认同)
  • 版本2不起作用.引发一个`TypeError:'Column'对象不可调用` (3认同)
  • 版本1对我不起作用,出现错误“列不可迭代” (2认同)

Jon*_*han 16

最直接的方法是使用expr函数

from pyspark.sql.functions import *
data = data.withColumn('total', expr("col1 + col2 + col3 + col4"))
Run Code Online (Sandbox Code Playgroud)


Fra*_*Boi 10

解决方案

newdf = df.withColumn('total', sum(df[col] for col in df.columns))
Run Code Online (Sandbox Code Playgroud)

由@Paul 作品发布。尽管如此,我还是收到了错误,正如我所看到的一样,

TypeError: 'Column' object is not callable
Run Code Online (Sandbox Code Playgroud)

一段时间后,我发现了问题(至少在我的情况下)。问题是我之前用这条线导入了一些pyspark函数

from pyspark.sql.functions import udf, col, count, sum, when, avg, mean, min
Run Code Online (Sandbox Code Playgroud)

所以该行导入了sumpyspark 命令,而df.withColumn('total', sum(df[col] for col in df.columns))应该使用普通的 pythonsum函数。

您可以删除 pyspark 函数的引用 del sum

否则在我的情况下,我将导入更改为

import pyspark.sql.functions as F
Run Code Online (Sandbox Code Playgroud)

然后将函数引用为F.sum.


Viv*_*asi 8

将列表中的多列汇总为一列

PySpark 的sum函数不支持列添加。这可以使用expr函数来实现。

from pyspark.sql.functions import expr

cols_list = ['a', 'b', 'c']

# Creating an addition expression using `join`
expression = '+'.join(cols_list)

df = df.withColumn('sum_cols', expr(expression))
Run Code Online (Sandbox Code Playgroud)

这为我们提供了所需的列总和。