我有一个巨大的HDFS文件,有时间序列数据点(雅虎股票价格).
我想找到时间序列的移动平均值我如何编写Apache Spark工作来做到这一点.
我想知道如何在Spark(Pyspark)中实现以下功能
初始数据帧:
+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+
Run Code Online (Sandbox Code Playgroud)
结果数据帧:
+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0| 7.0 |
+--+---+-------+
|3 |7.0| 3.0 |
+--+---+-------+
|2 |3.0| 5.0 |
+--+---+-------+
Run Code Online (Sandbox Code Playgroud)
我设法通过以下方式将新列"附加"到数据框中:
df.withColumn("new_Col", df.num * 10)
但是我不知道如何为新列实现这种"行的移位",以便新列具有前一行的字段值(如示例所示).我还在API文档中找不到有关如何通过索引访问DF中某一行的任何内容.
任何帮助,将不胜感激.
后:
val df = Seq((1, Vector(2, 3, 4)), (1, Vector(2, 3, 4))).toDF("Col1", "Col2")
Run Code Online (Sandbox Code Playgroud)
我在Apache Spark中有这个DataFrame:
+------+---------+
| Col1 | Col2 |
+------+---------+
| 1 |[2, 3, 4]|
| 1 |[2, 3, 4]|
+------+---------+
Run Code Online (Sandbox Code Playgroud)
我如何将其转换为:
+------+------+------+------+
| Col1 | Col2 | Col3 | Col4 |
+------+------+------+------+
| 1 | 2 | 3 | 4 |
| 1 | 2 | 3 | 4 |
+------+------+------+------+
Run Code Online (Sandbox Code Playgroud) 尝试构建ML时出现以下错误Pipeline:
pyspark.sql.utils.IllegalArgumentException: 'requirement failed: Column features must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually ArrayType(DoubleType,true).'
Run Code Online (Sandbox Code Playgroud)
我的features列包含一个浮点值数组.听起来我需要将它们转换为某种类型的向量(它不是稀疏的,所以是DenseVector?).有没有办法直接在DataFrame上执行此操作,还是需要转换为RDD?
python apache-spark pyspark apache-spark-ml apache-spark-mllib
PySpark文档描述了两个函数:
Run Code Online (Sandbox Code Playgroud)mapPartitions(f, preservesPartitioning=False) Return a new RDD by applying a function to each partition of this RDD. >>> rdd = sc.parallelize([1, 2, 3, 4], 2) >>> def f(iterator): yield sum(iterator) >>> rdd.mapPartitions(f).collect() [3, 7]
而......
Run Code Online (Sandbox Code Playgroud)mapPartitionsWithIndex(f, preservesPartitioning=False) Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition. >>> rdd = sc.parallelize([1, 2, 3, 4], 4) >>> def f(splitIndex, iterator): yield splitIndex >>> rdd.mapPartitionsWithIndex(f).sum() 6
这些功能试图解决哪些用例?我不明白他们为什么会被要求.
我有一个Dataframe,希望将它分成相同数量的行.
换句话说,我想要一个数据帧列表,其中每个数据帧都是原始数据帧的脱节子集.
假设输入dataframer如下:
+------------------+-----------+-----+--------------------+
| eventName|original_dt|count| features|
+------------------+-----------+-----+--------------------+
|15.509775004326936| 0| 100|[15.5097750043269...|
|15.509775004326936| 0| 101|[15.5097750043269...|
|15.509775004326936| 0| 102|[15.5097750043269...|
|15.509775004326936| 0| 103|[15.5097750043269...|
|15.509775004326936| 0| 104|[15.5097750043269...|
|15.509775004326936| 0| 105|[15.5097750043269...|
|15.509775004326936| 0| 106|[15.5097750043269...|
|15.509775004326936| 0| 107|[15.5097750043269...|
|15.509775004326936| 0| 108|[15.5097750043269...|
|15.509775004326936| 0| 109|[15.5097750043269...|
|15.509775004326936| 0| 110|[15.5097750043269...|
|15.509775004326936| 0| 111|[15.5097750043269...|
|15.509775004326936| 0| 112|[15.5097750043269...|
|15.509775004326936| 0| 113|[15.5097750043269...|
|15.509775004326936| 0| 114|[15.5097750043269...|
|15.509775004326936| 0| 115|[15.5097750043269...|
| 43.01955000865387| 0| 116|[43.0195500086538...|
+------------------+-----------+-----+--------------------+
Run Code Online (Sandbox Code Playgroud)
我希望将它拆分为K个相等大小的数据帧.如果k = 4,则可能的结果是:
+------------------+-----------+-----+--------------------+
| eventName|original_dt|count| features|
+------------------+-----------+-----+--------------------+
|15.509775004326936| 0| 106|[15.5097750043269...|
|15.509775004326936| 0| 107|[15.5097750043269...|
|15.509775004326936| 0| 110|[15.5097750043269...|
|15.509775004326936| 0| …Run Code Online (Sandbox Code Playgroud)