相关疑难解决方法(0)

Apache Spark - foreach与foreachPartitions何时使用什么?

我想知道foreachPartitions,与foreach考虑我RDD为了对累加器变量执行一些求和的情况下的方法相比,由于更高的并行度,意志是否会产生更好的性能.

java foreach scala apache-spark

35
推荐指数
3
解决办法
4万
查看次数

Spark/Scala:前进填充最后一次观察

使用Spark 1.4.0,Scala 2.10

我一直试图找出一种方法来使用最后一次已知的观察来转发填充空值,但我没有看到一种简单的方法.我认为这是一件非常常见的事情,但找不到显示如何执行此操作的示例.

我看到函数向前转移填充NaN的值,或滞后/超前函数来填充或移位数据偏移量,但没有任何东西可以获取最后的已知值.

在线查看,我在R中看到很多关于同一件事的Q/A,但在Spark/Scala中没有.

我正在考虑在日期范围内进行映射,从结果中过滤出NaN并选择最后一个元素,但我想我对语法感到困惑.

使用DataFrames我尝试类似的东西

import org.apache.spark.sql.expressions.Window

val sqlContext = new HiveContext(sc)

var spec = Window.orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last)
Run Code Online (Sandbox Code Playgroud)

但这并没有让我任何地方.

过滤器部分不起作用; map函数返回一个spark.sql.Columns序列,但是filter函数需要返回一个Boolean,所以我需要从Column中获取一个值来测试,但似乎只有Column方法返回一个Column.

有没有办法在Spark上更"简单"地做到这一点?

感谢您的输入

编辑:

简单示例示例输入:

2015-06-01,33
2015-06-02,
2015-06-03,
2015-06-04,
2015-06-05,22
2015-06-06,
2015-06-07,
...
Run Code Online (Sandbox Code Playgroud)

预期产量:

2015-06-01,33
2015-06-02,33
2015-06-03,33
2015-06-04,33
2015-06-05,22
2015-06-06,22
2015-06-07,22
Run Code Online (Sandbox Code Playgroud)

注意:

  1. 我有很多列,其中许多都有这种缺失的数据模式,但不是在相同的日期/时间.如果我需要,我将一次完成一列变换.

编辑:

按照@ zero323的回答我试过这样:

    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD

    val rows: RDD[Row] = df.orderBy($"Date").rdd


    def notMissing(row: Row): Boolean = { !row.isNullAt(1) } …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

26
推荐指数
1
解决办法
8269
查看次数

如何在Pyspark中使用滑动窗口对时间序列数据进行数据转换

我试图基于时间序列数据的滑动窗口提取功能.在Scala中,似乎有一个sliding基于这篇文章文档的函数

import org.apache.spark.mllib.rdd.RDDFunctions._

sc.parallelize(1 to 100, 10)
  .sliding(3)
  .map(curSlice => (curSlice.sum / curSlice.size))
  .collect()
Run Code Online (Sandbox Code Playgroud)

我的问题是PySpark中有类似的功能吗?或者,如果没有这样的功能,我们如何实现类似的滑动窗口转换呢?

python time-series apache-spark pyspark

10
推荐指数
2
解决办法
9575
查看次数

pySpark forEachPartition - Where is code executed

I'm using pySpark in version 2.3 (cannot update to 2.4 in my current dev-System) and have the following questions concerning the foreachPartition.

First a little context: As far as I understood pySpark-UDFs force the Python-code to be executed outside the Java Virtual Machine (JVM) in a Python-instance, making it performance-costing. Since I need to apply some Python-functions to my data and want to minimize overhead costs, I had the idea to at least load a handable bunch of …

python pandas apache-spark pyspark

4
推荐指数
1
解决办法
7559
查看次数