use*_*251 4 scala apache-spark apache-spark-sql
我试图找到 Spark 数据帧中多列的最大值。每个 Column 都有一个 double 类型的值。
数据框是这样的:
+-----+---+----+---+---+
|Name | A | B | C | D |
+-----+---+----+---+---+
|Alex |5.1|-6.2| 7| 8|
|John | 7| 8.3| 1| 2|
|Alice| 5| 46| 3| 2|
|Mark |-20| -11|-22| -5|
+-----+---+----+---+---+
Run Code Online (Sandbox Code Playgroud)
期望是:
+-----+---+----+---+---+----------+
|Name | A | B | C | D | MaxValue |
+-----+---+----+---+---+----------+
|Alex |5.1|-6.2| 7| 8| 8 |
|John | 7| 8.3| 1| 2| 8.3 |
|Alice| 5| 46| 3| 2| 46 |
|Mark |-20| -11|-22| -5| -5 |
+-----+---+----+---+---+----------+
Run Code Online (Sandbox Code Playgroud)
Leo*_*o C 10
您可以申请greatest数字列的列表,如下所示:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(
("Alex", 5.1, -6.2, 7.0, 8.0),
("John", 7.0, 8.3, 1.0, 2.0),
("Alice", 5.0, 46.0, 3.0, 2.0),
("Mark", -20.0, -11.0, -22.0, -5.0),
).toDF("Name", "A", "B", "C", "D")
val numCols = df.columns.tail // Apply suitable filtering as needed (*)
df.withColumn("MaxValue", greatest(numCols.head, numCols.tail: _*)).
show
// +-----+-----+-----+-----+----+--------+
// | Name| A| B| C| D|MaxValue|
// +-----+-----+-----+-----+----+--------+
// | Alex| 5.1| -6.2| 7.0| 8.0| 8.0|
// | John| 7.0| 8.3| 1.0| 2.0| 8.3|
// |Alice| 5.0| 46.0| 3.0| 2.0| 46.0|
// | Mark|-20.0|-11.0|-22.0|-5.0| -5.0|
// +-----+-----+-----+-----+----+--------+
Run Code Online (Sandbox Code Playgroud)
(*) 例如,要过滤所有顶级DoubleType列:
import org.apache.spark.sql.types._
val numCols = df.schema.fields.collect{
case StructField(name, DoubleType, _, _) => name
}
Run Code Online (Sandbox Code Playgroud)
如果您在Spark 2.4+,另一种方法是使用array_max,尽管在这种情况下它会涉及额外的转换步骤:
df.withColumn("MaxValue", array_max(array(numCols.map(col): _*)))
Run Code Online (Sandbox Code Playgroud)
Jon*_*ers -1
首先,我复制了你的 df:
scala> df.show
Run Code Online (Sandbox Code Playgroud)
+-----+---+----+---+---+
| Name| A| B| C| D|
+-----+---+----+---+---+
| Alex|5.1|-6.2| 7| 8|
| John| 7| 8.3| 1| 2|
|Alice| 5| 46| 3| 2|
| Mark|-20| -11|-22| -5|
+-----+---+----+---+---+
Run Code Online (Sandbox Code Playgroud)
然后我将其转换为 RDD 并在行级别对其进行转换:
+-----+---+----+---+---+
| Name| A| B| C| D|
+-----+---+----+---+---+
| Alex|5.1|-6.2| 7| 8|
| John| 7| 8.3| 1| 2|
|Alice| 5| 46| 3| 2|
| Mark|-20| -11|-22| -5|
+-----+---+----+---+---+
Run Code Online (Sandbox Code Playgroud)
这是最终输出:
import scala.math.max
case class MyData(Name: String, A: Double, B: Double, C: Double, D: Double, MaxValue: Double)
val maxDF = df.rdd.map(row => {
val a = row(1).toString.toDouble
val b = row(2).toString.toDouble
val c = row(3).toString.toDouble
val d = row(4).toString.toDouble
new MyData(row(0).toString, a, b, c, d, max(max(a, b), max(c, d)))
}).toDF
Run Code Online (Sandbox Code Playgroud)
+-----+-----+-----+-----+----+--------+
| Name| A| B| C| D|MaxValue|
+-----+-----+-----+-----+----+--------+
| Alex| 5.1| -6.2| 7.0| 8.0| 8.0|
| John| 7.0| 8.3| 1.0| 2.0| 8.3|
|Alice| 5.0| 46.0| 3.0| 2.0| 46.0|
| Mark|-20.0|-11.0|-22.0|-5.0| -5.0|
+-----+-----+-----+-----+----+--------+
Run Code Online (Sandbox Code Playgroud)