rga*_*101 5 scala apache-spark apache-spark-sql
I'm trying to add a new column to a DataFrame. The value of this column is the value of another column whose name depends on other columns from the same DataFrame.
For instance, given this:
+---+---+----+----+
| A| B| A_1| B_2|
+---+---+----+----+
| A| 1| 0.1| 0.3|
| B| 2| 0.2| 0.4|
+---+---+----+----+
Run Code Online (Sandbox Code Playgroud)
I'd like to obtain this:
+---+---+----+----+----+
| A| B| A_1| B_2| C|
+---+---+----+----+----+
| A| 1| 0.1| 0.3| 0.1|
| B| 2| 0.2| 0.4| 0.4|
+---+---+----+----+----+
Run Code Online (Sandbox Code Playgroud)
That is, I added column C whose value came from either column A_1 or B_2. The name of the source column A_1 comes from concatenating the value of columns A and B.
I know that I can add a new column based on another and a constant like this:
df.withColumn("C", $"B" + 1)
Run Code Online (Sandbox Code Playgroud)
I also know that the name of the column can come from a variable like this:
val name = "A_1"
df.withColumn("C", col(name) + 1)
Run Code Online (Sandbox Code Playgroud)
However, what I'd like to do is something like this:
df.withColumn("C", col(s"${col("A")}_${col("B")}"))
Run Code Online (Sandbox Code Playgroud)
Which doesn't work.
NOTE: I'm coding in Scala 2.11 and Spark 2.2.
您可以select从一个map. 定义将名称转换为列值的映射:
import org.apache.spark.sql.functions.{col, concat_ws, lit, map}
val dataMap = map(
df.columns.diff(Seq("A", "B")).flatMap(c => lit(c) :: col(c) :: Nil): _*
)
df.select(dataMap).show(false)
Run Code Online (Sandbox Code Playgroud)
import org.apache.spark.sql.functions.{col, concat_ws, lit, map}
val dataMap = map(
df.columns.diff(Seq("A", "B")).flatMap(c => lit(c) :: col(c) :: Nil): _*
)
df.select(dataMap).show(false)
Run Code Online (Sandbox Code Playgroud)
并从中选择apply:
df.withColumn("C", dataMap(concat_ws("_", $"A", $"B"))).show
Run Code Online (Sandbox Code Playgroud)
+---------------------------+
|map(A_1, A_1, B_2, B_2) |
+---------------------------+
|Map(A_1 -> 0.1, B_2 -> 0.3)|
|Map(A_1 -> 0.2, B_2 -> 0.4)|
+---------------------------+
Run Code Online (Sandbox Code Playgroud)
您也可以尝试映射,但我怀疑它对于非常广泛的数据表现不佳:
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val outputEncoder = RowEncoder(df.schema.add(StructField("C", DoubleType)))
df.map(row => {
val a = row.getAs[String]("A")
val b = row.getAs[String]("B")
val key = s"${a}_${b}"
Row.fromSeq(row.toSeq :+ row.getAs[Double](key))
})(outputEncoder).show
Run Code Online (Sandbox Code Playgroud)
df.withColumn("C", dataMap(concat_ws("_", $"A", $"B"))).show
Run Code Online (Sandbox Code Playgroud)
一般来说,我不会推荐这种方法。
如果数据来自csv您可能会考虑跳过默认csv读取器并使用自定义逻辑将列选择直接推入解析过程。用伪代码:
spark.read.text(...).map { line => {
val a = ??? // parse A
val b = ??? // parse B
val c = ??? // find c, based on a and b
(a, b, c)
}}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
6202 次 |
| 最近记录: |