相关疑难解决方法(0)

Spark使用上一行的值向数据框添加新列

我想知道如何在Spark(Pyspark)中实现以下功能

初始数据帧:

+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+
Run Code Online (Sandbox Code Playgroud)

结果数据帧:

+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0|  7.0  |
+--+---+-------+
|3 |7.0|  3.0  |
+--+---+-------+
|2 |3.0|  5.0  |
+--+---+-------+
Run Code Online (Sandbox Code Playgroud)

我设法通过以下方式将新列"附加"到数据框中: df.withColumn("new_Col", df.num * 10)

但是我不知道如何为新列实现这种"行的移位",以便新列具有前一行的字段值(如示例所示).我还在API文档中找不到有关如何通过索引访问DF中某一行的任何内容.

任何帮助,将不胜感激.

python dataframe apache-spark apache-spark-sql pyspark

33
推荐指数
1
解决办法
2万
查看次数

Spark中的默认分区方案

当我执行以下命令时:

scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)),4).partitionBy(new HashPartitioner(10)).persist()
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[10] at partitionBy at <console>:22

scala> rdd.partitions.size
res9: Int = 10

scala> rdd.partitioner.isDefined
res10: Boolean = true


scala> rdd.partitioner.get
res11: org.apache.spark.Partitioner = org.apache.spark.HashPartitioner@a
Run Code Online (Sandbox Code Playgroud)

它说有10个分区,分区完成使用HashPartitioner.但是当我执行以下命令时:

scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)),4)
...
scala> rdd.partitions.size
res6: Int = 4
scala> rdd.partitioner.isDefined
res8: Boolean = false
Run Code Online (Sandbox Code Playgroud)

它说有4个分区,并且没有定义分区器.那么,什么是Spark中的默认分区方案?/如何在第二种情况下对数据进行分区?

partitioning apache-spark rdd

17
推荐指数
1
解决办法
5176
查看次数

Apache Spark - 处理时间RDD上的滑动窗口

过去几个月我一直在使用Apache Spark做了很多工作,但现在我收到了一个相当困难的任务,在配对的滑动窗口上计算平均/最小/最大等等,RDD其中Key组件是日期标记, value组件是一个矩阵.因此,每个聚合函数也应该返回一个矩阵,其中对于每个单元格,对该时间段中所有该单元格的平均值进行平均.

我想能够说我想要每7天的平均值,有一天的滑动窗口.滑动窗口移动单元总是一个,然后是窗口大小的单位(所以如果每12周一次,则窗口移动单位为1).

我现在最初想的是简单地迭代,如果我们想要每X天的平均值,X次,并且每次只是按照它的日期对元素进行分组,并使用偏移量.

所以,如果我们有这种情况:

天:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15

矩阵:ABCDEFGHIJKLMNO

我们想要每5天的平均值,我将迭代5次并在此处显示分组:

第一次迭代:

第1组:(1,A)(2,B)(3,C)(4,D)(5,E)

第2组:(6,F)(7,G)(8,H)(9,I)(10,J)

第3组:(11,K)(12,L)(13,M)(14,N)(15,O)

第二次迭代:

第1组:(2,B)(3,C)(4,D)(5,E)(6,F)

第2组:(7,G)(8,H)(9,I)(10,J),(11,K)

第3组:(12,L)(13,M)(14,N)(15,O)

Etcetera,对于每个组,我必须做一个折叠/缩小程序来获得平均值.

然而,正如您可能想象的那样,这非常缓慢,可能是一种相当糟糕的方法.我无法找到更好的方法来做到这一点.

algorithm scala apache-spark

16
推荐指数
1
解决办法
6051
查看次数

如何在Pyspark中使用滑动窗口对时间序列数据进行数据转换

我试图基于时间序列数据的滑动窗口提取功能.在Scala中,似乎有一个sliding基于这篇文章文档的函数

import org.apache.spark.mllib.rdd.RDDFunctions._

sc.parallelize(1 to 100, 10)
  .sliding(3)
  .map(curSlice => (curSlice.sum / curSlice.size))
  .collect()
Run Code Online (Sandbox Code Playgroud)

我的问题是PySpark中有类似的功能吗?或者,如果没有这样的功能,我们如何实现类似的滑动窗口转换呢?

python time-series apache-spark pyspark

10
推荐指数
2
解决办法
9575
查看次数

如何进行时间序列简单预测?

我有一个时间序列的单变量数据.所以只是TimeStamp和Value.现在我想推断(预测)第二天/月/年的这个值.我知道有Box-jenkins(ARIMA)等方法.

Spark有线性回归,我尝试过,但是没有得到满意的结果.有没有人在Spark中尝试过时间序列的简单预测.可以分享实施方法吗?

PS:我在用户邮件列表中检查了这个问题,几乎所有关于这个问题的问题都没有得到答复.

scala time-series apache-spark

9
推荐指数
2
解决办法
7945
查看次数