使用 Apache Spark DataFrame 的部门的第二高价值

Bha*_*Das 0 apache-spark apache-spark-sql

如果我们将使用 SQL 查询来实现每个部门的第二大值的输出,那么我现在有一个表,department并且value现在可用,那么我们可以这样写:

select * from (SELECT department, asset_value, DENSE_RANK() over (partition by DEPARTMENT order by ASSET_VALUE) as x from [User1].[dbo].[UsersRecord]) y where x = 2;
Run Code Online (Sandbox Code Playgroud)

但是,在 Apache Spark 中,如果我们有不使用 SparkSQL 的限制,并且必须仅使用 DataFrame 实现输出,那么我们应该如何编写 Scala 代码?

我通过文档的帮助从我的角度进行了尝试,但无法弄清楚。

我的代码:

package com.tg.testpack1

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

object class2
{
 def main(args: Array[String]) 
 {
    val sparksessionobject = SparkSession.builder()
      .master("local")
      .config("spark.sql.warehouse.dir", "C:/Users/ox/spark/spark/spark-warehouse")
      .getOrCreate()
    sparksessionobject.conf.set("spark.sql.shuffle.partitions", 4)
    sparksessionobject.conf.set("spark.executor.memory", "2g")
    import sparksessionobject.sqlContext.implicits._

val df = Seq(("DEPT1", 1000), ("DEPT1", 500), ("DEPT1", 700), ("DEPT2", 400), ("DEPT2", 200),  ("DEPT3", 500), ("DEPT3", 200))
         .toDF("department", "assetValue")

df.withColumn("col3", dense_rank().over(Window.partitionBy($"department").orderBy($"assetValue".desc))).show
}
}
Run Code Online (Sandbox Code Playgroud)

我得到输出:

+-------------+----------+----+
|accountNumber|assetValue|col3|
+-------------+----------+----+
|        DEPT3|       500|   1|
|        DEPT3|       200|   2|
|        DEPT1|      1000|   1|
|        DEPT1|       700|   2|
|        DEPT1|       500|   3|
|        DEPT2|       400|   1|
|        DEPT2|       200|   2|
+-------------+----------+----+
Run Code Online (Sandbox Code Playgroud)

我期待输出:

+-------------+----------+----+
|accountNumber|assetValue|col3|
+-------------+----------+----+
|        DEPT1|       700|   2|
|        DEPT2|       200|   2|
|        DEPT3|       200|   2|
+-------------+----------+----+
Run Code Online (Sandbox Code Playgroud)

Man*_*ish 5

修改您的代码如下:

val byDeptOrderByAssetDesc = Window
  .partitionBy($"department")
  .orderBy($"assetValue" desc)
df.withColumn("col3", dense_rank() over byDeptOrderByAssetDesc)
  .filter("col3=2")
Run Code Online (Sandbox Code Playgroud)