Scala/Spark数据帧:找到与max相对应的列名

iva*_*ler 6 scala dataframe argmax apache-spark apache-spark-sql

在Scala/Spark中,有一个数据帧:

val dfIn = sqlContext.createDataFrame(Seq(
  ("r0", 0, 2, 3),
  ("r1", 1, 0, 0),
  ("r2", 0, 2, 2))).toDF("id", "c0", "c1", "c2")
Run Code Online (Sandbox Code Playgroud)

我想计算一个新列maxCol拿着名称对应于(各行)的最大值之列.在这个例子中,输出应该是:

+---+---+---+---+------+
| id| c0| c1| c2|maxCol|
+---+---+---+---+------+
| r0|  0|  2|  3|    c2|
| r1|  1|  0|  0|    c0|
| r2|  0|  2|  2|    c1|
+---+---+---+---+------+
Run Code Online (Sandbox Code Playgroud)

实际上,数据框有超过60列.因此,需要通用的解决方案.

Python Pandas中的等价物(是的,我知道,我应该与pyspark进行比较......)可能是:

dfOut = pd.concat([dfIn, dfIn.idxmax(axis=1).rename('maxCol')], axis=1) 
Run Code Online (Sandbox Code Playgroud)

use*_*411 12

通过一个小技巧,您可以使用greatest功能.所需进口:

import org.apache.spark.sql.functions.{col, greatest, lit, struct}
Run Code Online (Sandbox Code Playgroud)

首先让我们创建一个列表structs,其中第一个元素是值,第二个列名称:

val structs = dfIn.columns.tail.map(
  c => struct(col(c).as("v"), lit(c).as("k"))
)
Run Code Online (Sandbox Code Playgroud)

像这样的结构可以传递给greatest如下:

dfIn.withColumn("maxCol", greatest(structs: _*).getItem("k"))
Run Code Online (Sandbox Code Playgroud)
+---+---+---+---+------+
| id| c0| c1| c2|maxCol|
+---+---+---+---+------+
| r0|  0|  2|  3|    c2|
| r1|  1|  0|  0|    c0|
| r2|  0|  2|  2|    c2|
+---+---+---+---+------+
Run Code Online (Sandbox Code Playgroud)

请注意,如果是连接,它将采用序列中较晚出现的元素(按字典顺序排列(x, "c2") > (x, "c1")).如果由于某种原因这是不可接受的,您可以明确减少when:

import org.apache.spark.sql.functions.when

val max_col = structs.reduce(
  (c1, c2) => when(c1.getItem("v") >= c2.getItem("v"), c1).otherwise(c2)
).getItem("k")

dfIn.withColumn("maxCol", max_col)
Run Code Online (Sandbox Code Playgroud)
+---+---+---+---+------+
| id| c0| c1| c2|maxCol|
+---+---+---+---+------+
| r0|  0|  2|  3|    c2|
| r1|  1|  0|  0|    c0|
| r2|  0|  2|  2|    c1|
+---+---+---+---+------+
Run Code Online (Sandbox Code Playgroud)

如果是nullable列,则必须调整此coalescing值,例如通过值to -Inf.