How to compose column name using another column's value for withColumn in Scala Spark

rga*_*101 5 scala apache-spark apache-spark-sql

I'm trying to add a new column to a DataFrame. The value of this column is the value of another column whose name depends on other columns from the same DataFrame.

For instance, given this:

+---+---+----+----+
|  A|  B| A_1| B_2|
+---+---+----+----+
|  A|  1| 0.1| 0.3|
|  B|  2| 0.2| 0.4|
+---+---+----+----+
Run Code Online (Sandbox Code Playgroud)

I'd like to obtain this:

+---+---+----+----+----+
|  A|  B| A_1| B_2|   C|
+---+---+----+----+----+
|  A|  1| 0.1| 0.3| 0.1|
|  B|  2| 0.2| 0.4| 0.4|
+---+---+----+----+----+
Run Code Online (Sandbox Code Playgroud)

That is, I added column C whose value came from either column A_1 or B_2. The name of the source column A_1 comes from concatenating the value of columns A and B.

I know that I can add a new column based on another and a constant like this:

df.withColumn("C", $"B" + 1)
Run Code Online (Sandbox Code Playgroud)

I also know that the name of the column can come from a variable like this:

val name = "A_1"
df.withColumn("C", col(name) + 1)
Run Code Online (Sandbox Code Playgroud)

However, what I'd like to do is something like this:

df.withColumn("C", col(s"${col("A")}_${col("B")}"))
Run Code Online (Sandbox Code Playgroud)

Which doesn't work.

NOTE: I'm coding in Scala 2.11 and Spark 2.2.

zer*_*323 1

您可以select从一个map. 定义将名称转换为列值的映射:

import org.apache.spark.sql.functions.{col, concat_ws, lit, map}

val dataMap = map(
  df.columns.diff(Seq("A", "B")).flatMap(c => lit(c) :: col(c) :: Nil): _*
)

df.select(dataMap).show(false)
Run Code Online (Sandbox Code Playgroud)
import org.apache.spark.sql.functions.{col, concat_ws, lit, map}

val dataMap = map(
  df.columns.diff(Seq("A", "B")).flatMap(c => lit(c) :: col(c) :: Nil): _*
)

df.select(dataMap).show(false)
Run Code Online (Sandbox Code Playgroud)

并从中选择apply

df.withColumn("C", dataMap(concat_ws("_", $"A", $"B"))).show
Run Code Online (Sandbox Code Playgroud)
+---------------------------+
|map(A_1, A_1, B_2, B_2)    |
+---------------------------+
|Map(A_1 -> 0.1, B_2 -> 0.3)|
|Map(A_1 -> 0.2, B_2 -> 0.4)|
+---------------------------+
Run Code Online (Sandbox Code Playgroud)

您也可以尝试映射,但我怀疑它对于非常广泛的数据表现不佳:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val outputEncoder = RowEncoder(df.schema.add(StructField("C", DoubleType)))

df.map(row => {
   val a = row.getAs[String]("A")
   val b = row.getAs[String]("B")
   val key = s"${a}_${b}"
   Row.fromSeq(row.toSeq :+ row.getAs[Double](key))
})(outputEncoder).show
Run Code Online (Sandbox Code Playgroud)
df.withColumn("C", dataMap(concat_ws("_", $"A", $"B"))).show
Run Code Online (Sandbox Code Playgroud)

一般来说,我不会推荐这种方法。

如果数据来自csv您可能会考虑跳过默认csv读取器并使用自定义逻辑将列选择直接推入解析过程。用伪代码:

spark.read.text(...).map { line => {
  val a = ???  // parse A
  val b = ???  // parse B
  val c = ???  // find c, based on a and b
  (a, b, c)
}}
Run Code Online (Sandbox Code Playgroud)