如何向Spark DataFrame添加新列(使用PySpark)?

Bor*_*ris 110 python dataframe apache-spark apache-spark-sql pyspark

我有一个Spark DataFrame(使用PySpark 1.5.1)并想添加一个新列.

我试过以下但没有成功:

type(randomed_hours) # => list

# Create in Python and transform to RDD

new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

spark_new_col = sqlContext.createDataFrame(new_col)

my_df_spark.withColumn("hours", spark_new_col["new_col"])
Run Code Online (Sandbox Code Playgroud)

使用这个也有错误:

my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))
Run Code Online (Sandbox Code Playgroud)

那么如何使用PySpark将新列(基于Python向量)添加到现有的DataFrame中?

zer*_*323 175

您无法DataFrame在Spark中添加任意列.只能使用文字创建新列(其他文字类型在如何在Spark DataFrame中添加常量列中进行了描述?)

from pyspark.sql.functions import lit

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()

## +---+---+-----+---+
## | x1| x2|   x3| x4|
## +---+---+-----+---+
## |  1|  a| 23.0|  0|
## |  3|  B|-23.0|  0|
## +---+---+-----+---+
Run Code Online (Sandbox Code Playgroud)

转换现有列:

from pyspark.sql.functions import exp

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()

## +---+---+-----+---+--------------------+
## | x1| x2|   x3| x4|                  x5|
## +---+---+-----+---+--------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9|
## |  3|  B|-23.0|  0|1.026187963170189...|
## +---+---+-----+---+--------------------+
Run Code Online (Sandbox Code Playgroud)

包括使用join:

from pyspark.sql.functions import exp

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
    .join(lookup, col("x1") == col("k"), "leftouter")
    .drop("k")
    .withColumnRenamed("v", "x6"))

## +---+---+-----+---+--------------------+----+
## | x1| x2|   x3| x4|                  x5|  x6|
## +---+---+-----+---+--------------------+----+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|
## |  3|  B|-23.0|  0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+
Run Code Online (Sandbox Code Playgroud)

或使用function/udf生成:

from pyspark.sql.functions import rand

df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()

## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2|   x3| x4|                  x5|  x6|                 x7|
## +---+---+-----+---+--------------------+----+-------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|
## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+
Run Code Online (Sandbox Code Playgroud)

性能方面的内置函数(pyspark.sql.functions)映射到Catalyst表达式,通常比Python用户定义的函数更受欢迎.

如果要将任意RDD的内容添加为列,则可以

  • 行号添加到现有数据框
  • 调用zipWithIndexRDD并将其转换为数据帧
  • 使用index作为连接键加入两者

  • Spark文档的“伟大”之处在于,它为精明的读者留下了广泛的用法,直到成为练习者为止。Spark(和Pyspark)涵盖了一个名副其实的数据结构动物园,几乎没有关于如何在它们之间进行转换的说明。恰当的例子:问题的泛滥就像这样。 (6认同)
  • “只能通过使用文字来创建新列” 在这种情况下,文字究竟是什么意思? (3认同)

Mar*_*cok 57

要使用UDF添加列:

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def valueToCategory(value):
   if   value == 1: return 'cat1'
   elif value == 2: return 'cat2'
   ...
   else: return 'n/a'

# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()

## +---+---+-----+---------+
## | x1| x2|   x3| category|
## +---+---+-----+---------+
## |  1|  a| 23.0|     cat1|
## |  3|  B|-23.0|      n/a|
## +---+---+-----+---------+
Run Code Online (Sandbox Code Playgroud)


Luk*_*e W 27

对于Spark 2.0

# assumes schema has 'age' column 
df.select('*', (df.age + 10).alias('agePlusTen'))
Run Code Online (Sandbox Code Playgroud)

  • 需要是 df.select('*', (df.age + 10).alias('agePlusTen')) (2认同)

nee*_*ani 7

我们可以通过多种方式在 pySpark 中添加新列。

让我们首先创建一个简单的 DataFrame。

date = [27, 28, 29, None, 30, 31]
df = spark.createDataFrame(date, IntegerType())
Run Code Online (Sandbox Code Playgroud)

现在让我们尝试将列值加倍并将其存储在新列中。PFB 几种不同的方法实现相同。

# Approach - 1 : using withColumn function
df.withColumn("double", df.value * 2).show()

# Approach - 2 : using select with alias function.
df.select("*", (df.value * 2).alias("double")).show()

# Approach - 3 : using selectExpr function with as clause.
df.selectExpr("*", "value * 2 as double").show()

# Approach - 4 : Using as clause in SQL statement.
df.createTempView("temp")
spark.sql("select *, value * 2 as double from temp").show()
Run Code Online (Sandbox Code Playgroud)

有关 spark DataFrame 函数的更多示例和说明,您可以访问我的博客

我希望这有帮助。