如何找到多列的最大值?

use*_*251 4 scala apache-spark apache-spark-sql

我试图找到 Spark 数据帧中多列的最大值。每个 Column 都有一个 double 类型的值。

数据框是这样的:

+-----+---+----+---+---+
|Name | A | B  | C | D |
+-----+---+----+---+---+
|Alex |5.1|-6.2|  7|  8|
|John |  7| 8.3|  1|  2|
|Alice|  5|  46|  3|  2|
|Mark |-20| -11|-22| -5|
+-----+---+----+---+---+
Run Code Online (Sandbox Code Playgroud)

期望是:

+-----+---+----+---+---+----------+
|Name | A | B  | C | D | MaxValue |
+-----+---+----+---+---+----------+
|Alex |5.1|-6.2|  7|  8|     8    |
|John |  7| 8.3|  1|  2|     8.3  | 
|Alice|  5|  46|  3|  2|     46   |
|Mark |-20| -11|-22| -5|     -5   |
+-----+---+----+---+---+----------+
Run Code Online (Sandbox Code Playgroud)

Leo*_*o C 10

您可以申请greatest数字列的列表,如下所示:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq(
  ("Alex", 5.1, -6.2, 7.0, 8.0),
  ("John", 7.0, 8.3, 1.0, 2.0),
  ("Alice", 5.0, 46.0, 3.0, 2.0),
  ("Mark", -20.0, -11.0, -22.0, -5.0),
).toDF("Name", "A", "B", "C", "D")

val numCols = df.columns.tail  // Apply suitable filtering as needed (*)

df.withColumn("MaxValue", greatest(numCols.head, numCols.tail: _*)).
  show
// +-----+-----+-----+-----+----+--------+
// | Name|    A|    B|    C|   D|MaxValue|
// +-----+-----+-----+-----+----+--------+
// | Alex|  5.1| -6.2|  7.0| 8.0|     8.0|
// | John|  7.0|  8.3|  1.0| 2.0|     8.3|
// |Alice|  5.0| 46.0|  3.0| 2.0|    46.0|
// | Mark|-20.0|-11.0|-22.0|-5.0|    -5.0|
// +-----+-----+-----+-----+----+--------+
Run Code Online (Sandbox Code Playgroud)

(*) 例如,要过滤所有顶级DoubleType列:

import org.apache.spark.sql.types._

val numCols = df.schema.fields.collect{
  case StructField(name, DoubleType, _, _) => name
}
Run Code Online (Sandbox Code Playgroud)

如果您在Spark 2.4+,另一种方法是使用array_max,尽管在这种情况下它会涉及额外的转换步骤:

df.withColumn("MaxValue", array_max(array(numCols.map(col): _*)))
Run Code Online (Sandbox Code Playgroud)


Jon*_*ers -1

首先,我复制了你的 df:

scala> df.show
Run Code Online (Sandbox Code Playgroud)
+-----+---+----+---+---+
| Name|  A|   B|  C|  D|
+-----+---+----+---+---+
| Alex|5.1|-6.2|  7|  8|
| John|  7| 8.3|  1|  2|
|Alice|  5|  46|  3|  2|
| Mark|-20| -11|-22| -5|
+-----+---+----+---+---+

Run Code Online (Sandbox Code Playgroud)

然后我将其转换为 RDD 并在行级​​别对其进行转换:

+-----+---+----+---+---+
| Name|  A|   B|  C|  D|
+-----+---+----+---+---+
| Alex|5.1|-6.2|  7|  8|
| John|  7| 8.3|  1|  2|
|Alice|  5|  46|  3|  2|
| Mark|-20| -11|-22| -5|
+-----+---+----+---+---+

Run Code Online (Sandbox Code Playgroud)

这是最终输出:

import scala.math.max
case class MyData(Name: String, A: Double, B: Double, C: Double, D: Double, MaxValue: Double)
val maxDF = df.rdd.map(row => {
val a = row(1).toString.toDouble
val b = row(2).toString.toDouble
val c = row(3).toString.toDouble
val d = row(4).toString.toDouble
new MyData(row(0).toString, a, b, c, d, max(max(a, b), max(c, d)))
}).toDF
Run Code Online (Sandbox Code Playgroud)
+-----+-----+-----+-----+----+--------+
| Name|    A|    B|    C|   D|MaxValue|
+-----+-----+-----+-----+----+--------+
| Alex|  5.1| -6.2|  7.0| 8.0|     8.0|
| John|  7.0|  8.3|  1.0| 2.0|     8.3|
|Alice|  5.0| 46.0|  3.0| 2.0|    46.0|
| Mark|-20.0|-11.0|-22.0|-5.0|    -5.0|
+-----+-----+-----+-----+----+--------+
Run Code Online (Sandbox Code Playgroud)