相关疑难解决方法(0)

如何使用Spark查找中值和分位数

如何RDD使用分布式方法,IPython和Spark 找到整数的中位数?的RDD是约700 000元,因此过大,以收集和发现中位数.

这个问题与这个问题类似.但是,问题的答案是使用Scala,我不知道.

如何使用Apache Spark计算精确中位数?

使用Scala答案的思考,我试图在Python中编写类似的答案.

我知道我首先要排序RDD.我不知道怎么.我看到sortBy(按给定的方式对此RDD进行排序keyfunc)和sortByKey(对此进行排序RDD,假设它由(键,值)对组成.)方法.我认为两者都使用键值,而我RDD只有整数元素.

  1. 首先,我在考虑做什么myrdd.sortBy(lambda x: x)
  2. 接下来我将找到rdd(rdd.count())的长度.
  3. 最后,我想在rdd的中心找到元素或2个元素.我也需要这个方法的帮助.

编辑:

我有个主意.也许我可以索引我的RDD然后key = index和value = element.然后我可以尝试按价值排序?我不知道这是否可行,因为只有一种sortByKey方法.

python median apache-spark rdd pyspark

55
推荐指数
3
解决办法
6万
查看次数

SparkException:要汇编的值不能为null

我想StandardScaler用来规范化功能.

这是我的代码:

val Array(trainingData, testData) = dataset.randomSplit(Array(0.7,0.3))
val vectorAssembler = new VectorAssembler().setInputCols(inputCols).setOutputCol("features").transform(trainingData)   
val stdscaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").setWithStd(true).setWithMean(false).fit(vectorAssembler)
Run Code Online (Sandbox Code Playgroud)

但是当我试图使用时,它抛出了异常 StandardScaler

[Stage 151:==>                                                    (9 + 2) / 200]16/12/28 20:13:57 WARN scheduler.TaskSetManager: Lost task 31.0 in stage 151.0 (TID 8922, slave1.hadoop.ml): org.apache.spark.SparkException: Values to assemble cannot be null.
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:159)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:142)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
    at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:142)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:97)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql apache-spark-ml

10
推荐指数
1
解决办法
9644
查看次数

使用来自同一列的平均值填充Pyspark数据帧列空值

使用这样的数据帧,

rdd_2 = sc.parallelize([(0,10,223,"201601"), (0,10,83,"2016032"),(1,20,None,"201602"),(1,20,3003,"201601"), (1,20,None,"201603"), (2,40, 2321,"201601"), (2,30, 10,"201602"),(2,61, None,"201601")])

df_data = sqlContext.createDataFrame(rdd_2, ["id", "type", "cost", "date"])
df_data.show()

+---+----+----+-------+
| id|type|cost|   date|
+---+----+----+-------+
|  0|  10| 223| 201601|
|  0|  10|  83|2016032|
|  1|  20|null| 201602|
|  1|  20|3003| 201601|
|  1|  20|null| 201603|
|  2|  40|2321| 201601|
|  2|  30|  10| 201602|
|  2|  61|null| 201601|
+---+----+----+-------+
Run Code Online (Sandbox Code Playgroud)

我需要用现有值的平均值填充空值,预期结果为

+---+----+----+-------+
| id|type|cost|   date|
+---+----+----+-------+
|  0|  10| 223| 201601|
|  0|  10|  83|2016032|
|  1|  20|1128| 201602|
|  1| …
Run Code Online (Sandbox Code Playgroud)

python apache-spark apache-spark-sql pyspark pyspark-sql

7
推荐指数
1
解决办法
9046
查看次数