Bha*_*Das 0 apache-spark apache-spark-sql
如果我们将使用 SQL 查询来实现每个部门的第二大值的输出,那么我现在有一个表,department并且value现在可用,那么我们可以这样写:
select * from (SELECT department, asset_value, DENSE_RANK() over (partition by DEPARTMENT order by ASSET_VALUE) as x from [User1].[dbo].[UsersRecord]) y where x = 2;
Run Code Online (Sandbox Code Playgroud)
但是,在 Apache Spark 中,如果我们有不使用 SparkSQL 的限制,并且必须仅使用 DataFrame 实现输出,那么我们应该如何编写 Scala 代码?
我通过文档的帮助从我的角度进行了尝试,但无法弄清楚。
我的代码:
package com.tg.testpack1
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
object class2
{
def main(args: Array[String])
{
val sparksessionobject = SparkSession.builder()
.master("local")
.config("spark.sql.warehouse.dir", "C:/Users/ox/spark/spark/spark-warehouse")
.getOrCreate()
sparksessionobject.conf.set("spark.sql.shuffle.partitions", 4)
sparksessionobject.conf.set("spark.executor.memory", "2g")
import sparksessionobject.sqlContext.implicits._
val df = Seq(("DEPT1", 1000), ("DEPT1", 500), ("DEPT1", 700), ("DEPT2", 400), ("DEPT2", 200), ("DEPT3", 500), ("DEPT3", 200))
.toDF("department", "assetValue")
df.withColumn("col3", dense_rank().over(Window.partitionBy($"department").orderBy($"assetValue".desc))).show
}
}
Run Code Online (Sandbox Code Playgroud)
我得到输出:
+-------------+----------+----+
|accountNumber|assetValue|col3|
+-------------+----------+----+
| DEPT3| 500| 1|
| DEPT3| 200| 2|
| DEPT1| 1000| 1|
| DEPT1| 700| 2|
| DEPT1| 500| 3|
| DEPT2| 400| 1|
| DEPT2| 200| 2|
+-------------+----------+----+
Run Code Online (Sandbox Code Playgroud)
我期待输出:
+-------------+----------+----+
|accountNumber|assetValue|col3|
+-------------+----------+----+
| DEPT1| 700| 2|
| DEPT2| 200| 2|
| DEPT3| 200| 2|
+-------------+----------+----+
Run Code Online (Sandbox Code Playgroud)
修改您的代码如下:
val byDeptOrderByAssetDesc = Window
.partitionBy($"department")
.orderBy($"assetValue" desc)
df.withColumn("col3", dense_rank() over byDeptOrderByAssetDesc)
.filter("col3=2")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4312 次 |
| 最近记录: |