我在这篇DataBricks 帖子中看到,SparkSql支持窗口函数,特别是我正在尝试使用 lag() 窗口函数。
我有几行信用卡交易,我已经对它们进行了排序,现在我想遍历这些行,并为每一行显示交易金额,以及当前行金额与前一行金额的差值。
在 DataBricks 帖子之后,我提出了这个查询,但它向我抛出了一个异常,我不太明白为什么..
这是在 PySpark 中 .. tx 是我已经在注册为临时表时创建的数据框。
test =sqlContext.sql("SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, (lag(tx.amt) OVER (PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time ROW BETWEEN PRECEDING AND CURRENT ROW)) as prev_amt from tx")
Run Code Online (Sandbox Code Playgroud)
和异常(被截断)..
py4j.protocol.Py4JJavaError: An error occurred while calling o76.sql.
: java.lang.RuntimeException: [1.67] failure: ``)'' expected but identifier OVER found
Run Code Online (Sandbox Code Playgroud)
我真的很欣赏任何见解,此功能相当新,就现有示例或其他相关帖子而言,没有太多可做的。
编辑
我也尝试在没有 SQL 语句的情况下执行此操作,如下所示,但继续出现错误。我已经将它与 Hive 和 SQLContext 一起使用,并收到相同的错误。
windowSpec = \
Window \
.partitionBy(h_tx_df_ordered['cc_num']) \
.orderBy(h_tx_df_ordered['cc_num'],h_tx_df_ordered['trans_date'],h_tx_df_ordered['trans_time'])
windowSpec.rowsBetween(-1, 0)
lag_amt = \
(lag(h_tx_df_ordered['amt']).over(windowSpec) …
Run Code Online (Sandbox Code Playgroud) 我需要一些帮助,让我的大脑设计出一个(高效的)火花火花链(通过python)。我已经尽力而为了,但是我想出的代码却无法扩展。.基本上,对于各个地图阶段,我都编写了自定义函数,它们对于几千个序列都可以正常工作,但是当我们得到在20,000多个(我最多有80万个)中,速度变慢了。
对于那些不熟悉Markov moodels的人来说,这就是要点。
这是我的数据。此时,我已经在RDD中获得了实际数据(没有标题)。
ID, SEQ
500, HNL, LNH, MLH, HML
Run Code Online (Sandbox Code Playgroud)
我们看一下元组中的序列
(HNL, LNH), (LNH,MLH), etc..
Run Code Online (Sandbox Code Playgroud)
我需要到达这一点..在这里,我返回一个字典(针对每一行数据),然后将其序列化并存储在内存数据库中。
{500:
{HNLLNH : 0.333},
{LNHMLH : 0.333},
{MLHHML : 0.333},
{LNHHNL : 0.000},
etc..
}
Run Code Online (Sandbox Code Playgroud)
因此,从本质上讲,每个序列都与下一个序列结合(HNL,LNH变为“ HNLLNH”),然后对于所有可能的过渡(序列的组合),我们对它们的出现进行计数,然后除以过渡总数(在这种情况下为3)并获取它们的出现频率。
上面有3个转换,其中一个是HNLLNH。因此,对于HNLLNH,1/3 = 0.333
顺便说一句,我不确定是否相关,但是序列中每个位置的值都受到限制。.第一位置(H / M / L),第二位置(M / L),第三位置(H ,M,L)。
我的代码以前做过的事情是collect()rdd,并使用我编写的函数将其映射两次。这些功能首先将字符串转换为列表,然后将list [1]与list [2]合并,然后将list [2]与list [3]合并,然后将list [3]与list [4]合并,依此类推。像这样
[HNLLNH],[LNHMLH],[MHLHML], etc..
Run Code Online (Sandbox Code Playgroud)
然后,下一个函数使用该列表项作为关键字从该列表中创建一个词典,然后计算整个列表中该关键字的总出现次数,除以len(list)以获得频率。然后,我将该字典和它的ID号一起包装在另一个字典中(导致第二个代码块,在上方)。
就像我说的那样,这对于小序列的序列来说效果很好,但是对于长度超过100k的列表而言效果不佳。
另外,请记住,这只是一行数据。我必须在10-20k行数据的任何位置执行此操作,每行数据的长度在500-800,000个序列之间变化。
关于如何编写pyspark代码(使用API map / reduce / agg / etc ..函数)以有效地做到这一点的任何建议?
编辑 代码如下。从底部开始可能是有意义的。请记住,我正在学习这方面的知识(Python和Spark),而我并不是为了谋生而这样做,所以我的编码标准不是很好。
def f(x):
# Custom RDD map function
# Combines two separate transactions …
Run Code Online (Sandbox Code Playgroud)