我想知道foreachPartitions,与foreach考虑我RDD为了对累加器变量执行一些求和的情况下的方法相比,由于更高的并行度,意志是否会产生更好的性能.
使用Spark 1.4.0,Scala 2.10
我一直试图找出一种方法来使用最后一次已知的观察来转发填充空值,但我没有看到一种简单的方法.我认为这是一件非常常见的事情,但找不到显示如何执行此操作的示例.
我看到函数向前转移填充NaN的值,或滞后/超前函数来填充或移位数据偏移量,但没有任何东西可以获取最后的已知值.
在线查看,我在R中看到很多关于同一件事的Q/A,但在Spark/Scala中没有.
我正在考虑在日期范围内进行映射,从结果中过滤出NaN并选择最后一个元素,但我想我对语法感到困惑.
使用DataFrames我尝试类似的东西
import org.apache.spark.sql.expressions.Window
val sqlContext = new HiveContext(sc)
var spec = Window.orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")
val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last)
Run Code Online (Sandbox Code Playgroud)
但这并没有让我任何地方.
过滤器部分不起作用; map函数返回一个spark.sql.Columns序列,但是filter函数需要返回一个Boolean,所以我需要从Column中获取一个值来测试,但似乎只有Column方法返回一个Column.
有没有办法在Spark上更"简单"地做到这一点?
感谢您的输入
编辑:
简单示例示例输入:
2015-06-01,33
2015-06-02,
2015-06-03,
2015-06-04,
2015-06-05,22
2015-06-06,
2015-06-07,
...
Run Code Online (Sandbox Code Playgroud)
预期产量:
2015-06-01,33
2015-06-02,33
2015-06-03,33
2015-06-04,33
2015-06-05,22
2015-06-06,22
2015-06-07,22
Run Code Online (Sandbox Code Playgroud)
注意:
编辑:
按照@ zero323的回答我试过这样:
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
val rows: RDD[Row] = df.orderBy($"Date").rdd
def notMissing(row: Row): Boolean = { !row.isNullAt(1) } …Run Code Online (Sandbox Code Playgroud) 我试图基于时间序列数据的滑动窗口提取功能.在Scala中,似乎有一个sliding基于这篇文章和文档的函数
import org.apache.spark.mllib.rdd.RDDFunctions._
sc.parallelize(1 to 100, 10)
.sliding(3)
.map(curSlice => (curSlice.sum / curSlice.size))
.collect()
Run Code Online (Sandbox Code Playgroud)
我的问题是PySpark中有类似的功能吗?或者,如果没有这样的功能,我们如何实现类似的滑动窗口转换呢?
I'm using pySpark in version 2.3 (cannot update to 2.4 in my current dev-System) and have the following questions concerning the foreachPartition.
First a little context: As far as I understood pySpark-UDFs force the Python-code to be executed outside the Java Virtual Machine (JVM) in a Python-instance, making it performance-costing.
Since I need to apply some Python-functions to my data and want to minimize overhead costs, I had the idea to at least load a handable bunch of …