Apache Spark移动平均线

Ahm*_*bib 37 time-series moving-average hdfs apache-spark

我有一个巨大的HDFS文件,有时间序列数据点(雅虎股票价格).

我想找到时间序列的移动平均值我如何编写Apache Spark工作来做到这一点.

小智 29

您可以使用MLLIB中的滑动功能,它可能与Daniel的答案完全相同.在使用滑动功能之前,您必须按时间对数据进行排序.

import org.apache.spark.mllib.rdd.RDDFunctions._

sc.parallelize(1 to 100, 10)
  .sliding(3)
  .map(curSlice => (curSlice.sum / curSlice.size))
  .collect()
Run Code Online (Sandbox Code Playgroud)


Dan*_*bos 22

移动平均线对于Spark和任何分布式系统来说都是一个棘手的问题.当数据分布在多台机器上时,会有一些时间窗口跨越分区.我们必须在分区的开头复制数据,以便计算每个分区的移动平均值可以完全覆盖.

这是一种在Spark中执行此操作的方法.示例数据:

val ts = sc.parallelize(0 to 100, 10)
val window = 3
Run Code Online (Sandbox Code Playgroud)

一个简单的分区程序,它将每行放在我们通过键指定的分区中:

class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner {
  def numPartitions = p
  def getPartition(key: Any) = key.asInstanceOf[Int]
}
Run Code Online (Sandbox Code Playgroud)

使用window - 1复制到上一个分区的第一行创建数据:

val partitioned = ts.mapPartitionsWithIndex((i, p) => {
  val overlap = p.take(window - 1).toArray
  val spill = overlap.iterator.map((i - 1, _))
  val keep = (overlap.iterator ++ p).map((i, _))
  if (i == 0) keep else keep ++ spill
}).partitionBy(new StraightPartitioner(ts.partitions.length)).values
Run Code Online (Sandbox Code Playgroud)

只需计算每个分区的移动平均值:

val movingAverage = partitioned.mapPartitions(p => {
  val sorted = p.toSeq.sorted
  val olds = sorted.iterator
  val news = sorted.iterator
  var sum = news.take(window - 1).sum
  (olds zip news).map({ case (o, n) => {
    sum += n
    val v = sum
    sum -= o
    v
  }})
})
Run Code Online (Sandbox Code Playgroud)

由于重复的细分,因此覆盖范围没有差距.

scala> movingAverage.collect.sameElements(3 to 297 by 3)
res0: Boolean = true
Run Code Online (Sandbox Code Playgroud)

  • 为什么不能通过遍历RDD来做到这一点?这将按顺序返回分区……然后您只需要复制 RDD 末尾的部分。我想知道 updateStateByKey 是否有助于使事情变得更容易。 (2认同)
  • 这是一种有趣的方法,但您做出了一个冒险的假设,即没有空的 / 到短的分区。例如:`val m = Map(1 -> (0 to 50).toIterator, 4 -> (51 to 100).toIterator).withDefault(i => Iterator()); val ts = sc.parallelize(Seq.empty[Int], 10).mapPartitionsWithIndex((i, _) => m(i))` (2认同)

olu*_*ies 15

Spark 1.4引入了窗口函数,这意味着你可以按照以下方式进行移动平均:使用rowsBetween调整窗口:

val schema = Seq("id", "cykle", "value")
 val data = Seq(
        (1, 1, 1),
        (1, 2, 11),
        (1, 3, 1),
        (1, 4, 11),
        (1, 5, 1),
        (1, 6, 11),
        (2, 1, 1),
        (2, 2, 11),
        (2, 3, 1),
        (2, 4, 11),
        (2, 5, 1),
        (2, 6, 11)
      )

val dft = sc.parallelize(data).toDF(schema: _*)

dft.select('*).show

// PARTITION BY id  ORDER BY cykle ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING (5)
val w = Window.partitionBy("id").orderBy("cykle").rowsBetween(-2, 2)

val x = dft.select($"id",$"cykle",avg($"value").over(w))
x.show
Run Code Online (Sandbox Code Playgroud)

输出(齐柏林飞艇):

schema: Seq[String] = List(id, cykle, value)
data: Seq[(Int, Int, Int)] = List((1,1,1), (1,2,11), (1,3,1), (1,4,11), (1,5,1), (1,6,11), (2,1,1), (2,2,11), (2,3,1), (2,4,11), (2,5,1), (2,6,11))
dft: org.apache.spark.sql.DataFrame = [id: int, cykle: int, value: int]
+---+-----+-----+
| id|cykle|value|
+---+-----+-----+
|  1|    1|    1|
|  1|    2|   11|
|  1|    3|    1|
|  1|    4|   11|
|  1|    5|    1|
|  1|    6|   11|
|  2|    1|    1|
|  2|    2|   11|
|  2|    3|    1|
|  2|    4|   11|
|  2|    5|    1|
|  2|    6|   11|
+---+-----+-----+
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@55cd666f
x: org.apache.spark.sql.DataFrame = [id: int, cykle: int, 'avg(value) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING: double]
+---+-----+-------------------------------------------------------------------------+
| id|cykle|'avg(value) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING|
+---+-----+-------------------------------------------------------------------------+
|  1|    1|                                                        4.333333333333333|
|  1|    2|                                                                      6.0|
|  1|    3|                                                                      5.0|
|  1|    4|                                                                      7.0|
|  1|    5|                                                                      6.0|
|  1|    6|                                                        7.666666666666667|
|  2|    1|                                                        4.333333333333333|
|  2|    2|                                                                      6.0|
|  2|    3|                                                                      5.0|
|  2|    4|                                                                      7.0|
|  2|    5|                                                                      6.0|
|  2|    6|                                                        7.666666666666667|
+---+-----+————————————————————————————————————+
Run Code Online (Sandbox Code Playgroud)