将多列附加到spark中的现有数据帧

nat*_*nat 4 scala bigdata apache-spark apache-spark-sql

我需要将多列附加到现有的 spark 数据框,其中列名在 List 中给出,假设新列的值是恒定的,例如给定的输入列和数据框是

val columnsNames=List("col1","col2")
val data = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4))
Run Code Online (Sandbox Code Playgroud)

并在附加两列后,假设 col1 的常量值为“val1”,col2 的常量值为“val2”,则输出数据帧应为

+-----+---+-------+------+
|   _1| _2|col1   |col2|
+-----+---+-------+------+
|  one|  1|val1   |val2|
|  two|  2|val1   |val2|
|three|  3|val1   |val2|
| four|  4|val1   |val2|
+-----+---+-------+------+
Run Code Online (Sandbox Code Playgroud)

我写了一个函数来追加列

def appendColumns (cols: List[String], ds: DataFrame): DataFrame = {

            cols match {

                case Nil => ds
                case h :: Nil => appendColumns(Nil, ds.withColumn(h, lit(h)))
                case h :: tail => appendColumns(tail, ds.withColumn(h, lit(h)))

            }
        }
Run Code Online (Sandbox Code Playgroud)

有没有更好的方法和更实用的方法来做到这一点。

谢谢

Oli*_*Oli 6

是的,有一个更好更简单的方法。基本上,您调用的次数与withColumn列数一样多。有很多列,催化剂,优化火花查询的引擎可能会感觉有点不知所措(我过去有过类似用例的经验)。我什至看到它在试验数千列时导致驱动程序 OOM。为了避免给催化剂带来压力(并编写更少的代码;-)),您可以简单地使用select如下所示的方式在一个 spark 命令中完成此操作:

val data = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF
// let's assume that we have a map that associates column names to their values
val columnMap = Map("col1" -> "val1", "col2" -> "val2")
// Let's create the new columns from the map
val newCols = columnMap.keys.map(k => lit(columnMap(k)) as k)
// selecting the old columns + the new ones
data.select(data.columns.map(col) ++ newCols : _*).show
+-----+---+----+----+
|   _1| _2|col1|col2|
+-----+---+----+----+
|  one|  1|val1|val2|
|  two|  2|val1|val2|
|three|  3|val1|val2|
| four|  4|val1|val2|
+-----+---+----+----+
Run Code Online (Sandbox Code Playgroud)


the*_*tom 5

与递归相反,对于有限数量的列,我认为使用 FoldLeft 的更通用方法更通用。使用 Databricks 笔记本:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

import spark.implicits._

val columnNames = Seq("c3","c4")
val df = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("c1", "c2")

def addCols(df: DataFrame, columns: Seq[String]): DataFrame = {
    columns.foldLeft(df)((acc, col) => {
      acc.withColumn(col, lit(col)) })
}

val df2 = addCols(df, columnNames)
df2.show(false)
Run Code Online (Sandbox Code Playgroud)

返回:

+-----+---+---+---+
|c1   |c2 |c3 |c4 |
+-----+---+---+---+
|one  |1  |c3 |c4 |
|two  |2  |c3 |c4 |
|three|3  |c3 |c4 |
|four |4  |c3 |c4 |
+-----+---+---+---+
Run Code Online (Sandbox Code Playgroud)

请注意以下内容:https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015 ,尽管上下文略有不同,另一个答案通过 select 方法暗示了这一点。