Qualify + Row_number 语句的 SPARK SQL 等效项

Bri*_*ero 7 sql row-number window-functions apache-spark apache-spark-sql

有谁知道 Apache Spark SQL 达到与标准 SQLqualify() + rnk 或 row_number 语句相同结果的最佳方法吗?

例如:

  • 我有一个名为 statement_data 的 Spark 数据框,每个月有 12 条记录,每个记录有 100 个唯一的 account_numbers,因此总共有 1200 条记录
  • 每个月记录都有一个名为“statement_date”的字段,可用于确定最近的记录

I want my final result to be a new Spark Dataframe with the 3 most recent records (as determined by statement_date descending) for each of the 100 unique account_numbers, therefore 300 final records in total.

In standard Teradata SQL, I can do the following:

select * from statement_data
qualify row_number ()
over(partition by acct_id order by statement_date desc) <= 3
Run Code Online (Sandbox Code Playgroud)

Apache Spark SQL does not have a standalone qualify function that I'm aware of, maybe I'm screwing up the syntax or can't find documentation that qualify exists.

It is fine if I need to do this in two steps as long as those two steps are:

  • A select query or alternative method to assign rank/row numbering for each account_number's records
  • A select query where I'm selecting all records with rank <= 3 (i.e. choose 1st, 2nd, and 3rd most recent records).

EDIT 1 - 7/23 2:09pm: The initial solution provided by zero323 was not working for me in Spark 1.4.1 with Spark SQL 1.4.1 dependency installed.

EDIT 2 - 7/23 3:24pm: It turns out the error was related to using SQL Context objects for my query instead of Hive Context. I am now able to run the below solution correctly after adding the following code to create and use a Hive Context:

final JavaSparkContext sc2;
final HiveContext hc2;
DataFrame df;
hc2 = TestHive$.MODULE$;
sc2 = new JavaSparkContext(hc2.sparkContext()); 
....
// Initial Spark/SQL contexts to set up Dataframes  
SparkConf conf = new SparkConf().setAppName("Statement Test");
...
DataFrame stmtSummary = 
    hc2.sql("SELECT * FROM (SELECT acct_id, stmt_end_dt, stmt_curr_bal, row_number() over (partition by acct_id order by stmt_curr_bal DESC) rank_num FROM stmt_data) tmp WHERE rank_num <= 3");
Run Code Online (Sandbox Code Playgroud)

zer*_*323 9

There is no qualify (it is usually useful to check parser source) but you can use subquery like this:

SELECT * FROM (
    SELECT *, row_number() OVER (
        PARTITION BY acct_id ORDER BY statement_date DESC
    ) rank FROM df
 ) tmp WHERE rank <= 3
Run Code Online (Sandbox Code Playgroud)

See also SPARK : failure: ``union'' expected but `(' found