我在这篇DataBricks 帖子中看到,SparkSql支持窗口函数,特别是我正在尝试使用 lag() 窗口函数。
我有几行信用卡交易,我已经对它们进行了排序,现在我想遍历这些行,并为每一行显示交易金额,以及当前行金额与前一行金额的差值。
在 DataBricks 帖子之后,我提出了这个查询,但它向我抛出了一个异常,我不太明白为什么..
这是在 PySpark 中 .. tx 是我已经在注册为临时表时创建的数据框。
test =sqlContext.sql("SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, (lag(tx.amt) OVER (PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time ROW BETWEEN PRECEDING AND CURRENT ROW)) as prev_amt from tx")
Run Code Online (Sandbox Code Playgroud)
和异常(被截断)..
py4j.protocol.Py4JJavaError: An error occurred while calling o76.sql.
: java.lang.RuntimeException: [1.67] failure: ``)'' expected but identifier OVER found
Run Code Online (Sandbox Code Playgroud)
我真的很欣赏任何见解,此功能相当新,就现有示例或其他相关帖子而言,没有太多可做的。
编辑
我也尝试在没有 SQL 语句的情况下执行此操作,如下所示,但继续出现错误。我已经将它与 Hive 和 SQLContext 一起使用,并收到相同的错误。
windowSpec = \
Window \
.partitionBy(h_tx_df_ordered['cc_num']) \
.orderBy(h_tx_df_ordered['cc_num'],h_tx_df_ordered['trans_date'],h_tx_df_ordered['trans_time'])
windowSpec.rowsBetween(-1, 0)
lag_amt = \
(lag(h_tx_df_ordered['amt']).over(windowSpec) …Run Code Online (Sandbox Code Playgroud)