Bha*_*Das 0 apache-spark apache-spark-sql
如果我们将使用 SQL 查询来实现每个部门的第二大值的输出,那么我现在有一个表,department并且value现在可用,那么我们可以这样写:
select * from (SELECT department, asset_value, DENSE_RANK() over (partition by DEPARTMENT order by ASSET_VALUE) as x from [User1].[dbo].[UsersRecord]) y where x = 2;
但是,在 Apache Spark 中,如果我们有不使用 SparkSQL 的限制,并且必须仅使用 DataFrame 实现输出,那么我们应该如何编写 Scala 代码?
我通过文档的帮助从我的角度进行了尝试,但无法弄清楚。
我的代码:
package com.tg.testpack1
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
object class2
{
 def main(args: Array[String]) 
 {
    val sparksessionobject = SparkSession.builder()
      .master("local")
      .config("spark.sql.warehouse.dir", "C:/Users/ox/spark/spark/spark-warehouse")
      .getOrCreate()
    sparksessionobject.conf.set("spark.sql.shuffle.partitions", 4)
    sparksessionobject.conf.set("spark.executor.memory", "2g")
    import sparksessionobject.sqlContext.implicits._
val df = Seq(("DEPT1", 1000), ("DEPT1", 500), ("DEPT1", 700), ("DEPT2", 400), ("DEPT2", 200),  ("DEPT3", 500), ("DEPT3", 200))
         .toDF("department", "assetValue")
df.withColumn("col3", dense_rank().over(Window.partitionBy($"department").orderBy($"assetValue".desc))).show
}
}
我得到输出:
+-------------+----------+----+
|accountNumber|assetValue|col3|
+-------------+----------+----+
|        DEPT3|       500|   1|
|        DEPT3|       200|   2|
|        DEPT1|      1000|   1|
|        DEPT1|       700|   2|
|        DEPT1|       500|   3|
|        DEPT2|       400|   1|
|        DEPT2|       200|   2|
+-------------+----------+----+
我期待输出:
+-------------+----------+----+
|accountNumber|assetValue|col3|
+-------------+----------+----+
|        DEPT1|       700|   2|
|        DEPT2|       200|   2|
|        DEPT3|       200|   2|
+-------------+----------+----+
修改您的代码如下:
val byDeptOrderByAssetDesc = Window
  .partitionBy($"department")
  .orderBy($"assetValue" desc)
df.withColumn("col3", dense_rank() over byDeptOrderByAssetDesc)
  .filter("col3=2")
| 归档时间: | 
 | 
| 查看次数: | 4312 次 | 
| 最近记录: |