SparkSQL - 滞后函数?

nam*_*don 5 sql window-functions apache-spark apache-spark-sql pyspark

我在这篇DataBricks 帖子中看到,SparkSql支持窗口函数,特别是我正在尝试使用 lag() 窗口函数。

我有几行信用卡交易,我已经对它们进行了排序,现在我想遍历这些行,并为每一行显示交易金额,以及当前行金额与前一行金额的差值。

在 DataBricks 帖子之后,我提出了这个查询,但它向我抛出了一个异常,我不太明白为什么..

这是在 PySpark 中 .. tx 是我已经在注册为临时表时创建的数据框。

test =sqlContext.sql("SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, (lag(tx.amt) OVER (PARTITION BY tx.cc_num ORDER BY  tx.trans_date,tx.trans_time ROW BETWEEN PRECEDING AND CURRENT ROW)) as prev_amt from tx")
Run Code Online (Sandbox Code Playgroud)

和异常(被截断)..

py4j.protocol.Py4JJavaError: An error occurred while calling o76.sql.
: java.lang.RuntimeException: [1.67] failure: ``)'' expected but identifier OVER found
Run Code Online (Sandbox Code Playgroud)

我真的很欣赏任何见解,此功能相当新,就现有示例或其他相关帖子而言,没有太多可做的。

编辑

我也尝试在没有 SQL 语句的情况下执行此操作,如下所示,但继续出现错误。我已经将它与 Hive 和 SQLContext 一起使用,并收到相同的错误。

windowSpec = \
Window \
    .partitionBy(h_tx_df_ordered['cc_num']) \
    .orderBy(h_tx_df_ordered['cc_num'],h_tx_df_ordered['trans_date'],h_tx_df_ordered['trans_time'])

windowSpec.rowsBetween(-1, 0)

lag_amt = \
   (lag(h_tx_df_ordered['amt']).over(windowSpec) - h_tx_df_ordered['amt'])
    tx_df_ordered.select(
    h_tx_df_ordered['cc_num'],
    h_tx_df_ordered['trans_date'],
    h_tx_df_ordered['trans_time'],
    h_tx_df_ordered['amt'],
    lag_amt.alias("prev_amt")).show()
Run Code Online (Sandbox Code Playgroud)
Traceback (most recent call last):
  File "rdd_raw_data.py", line 116, in <module>
    lag_amt.alias("prev_amt")).show()
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 721, in select
    jdf = self._jdf.select(self._jcols(*cols))
  File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o152.select.
: org.apache.spark.sql.AnalysisException: Could not resolve window function 'lag'. Note that, using window functions currently requires a HiveContext;
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
Run Code Online (Sandbox Code Playgroud)

zer*_*323 6

  1. 框架规范应以关键字ROWSnot开头ROW
  2. 帧规范需要下限值

    ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
    
    Run Code Online (Sandbox Code Playgroud)

    UNBOUNDED关键字

    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    
    Run Code Online (Sandbox Code Playgroud)
  3. LAG 函数根本不接受框架,因此具有滞后的正确 SQL 查询可能如下所示

    SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, LAG(tx.amt) OVER (
         PARTITION BY tx.cc_num ORDER BY  tx.trans_date,tx.trans_time
    ) as prev_amt from tx
    
    Run Code Online (Sandbox Code Playgroud)

编辑

关于 SQL DSL 的使用:

  1. 正如您在错误消息中所读到的那样

    请注意,当前使用窗口函数需要一个 HiveContex

    请务必sqlContext使用HiveContextnot进行初始化SQLContext

  2. windowSpec.rowsBetween(-1, 0)什么都不做,但该lag函数再次不支持帧规范。