如何RDD使用分布式方法,IPython和Spark 找到整数的中位数?的RDD是约700 000元,因此过大,以收集和发现中位数.
这个问题与这个问题类似.但是,问题的答案是使用Scala,我不知道.
使用Scala答案的思考,我试图在Python中编写类似的答案.
我知道我首先要排序RDD.我不知道怎么.我看到sortBy(按给定的方式对此RDD进行排序keyfunc)和sortByKey(对此进行排序RDD,假设它由(键,值)对组成.)方法.我认为两者都使用键值,而我RDD只有整数元素.
myrdd.sortBy(lambda x: x)?rdd.count())的长度.编辑:
我有个主意.也许我可以索引我的RDD然后key = index和value = element.然后我可以尝试按价值排序?我不知道这是否可行,因为只有一种sortByKey方法.
我想StandardScaler用来规范化功能.
这是我的代码:
val Array(trainingData, testData) = dataset.randomSplit(Array(0.7,0.3))
val vectorAssembler = new VectorAssembler().setInputCols(inputCols).setOutputCol("features").transform(trainingData)
val stdscaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").setWithStd(true).setWithMean(false).fit(vectorAssembler)
Run Code Online (Sandbox Code Playgroud)
但是当我试图使用时,它抛出了异常 StandardScaler
[Stage 151:==> (9 + 2) / 200]16/12/28 20:13:57 WARN scheduler.TaskSetManager: Lost task 31.0 in stage 151.0 (TID 8922, slave1.hadoop.ml): org.apache.spark.SparkException: Values to assemble cannot be null.
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:159)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:142)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:142)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:97)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) …Run Code Online (Sandbox Code Playgroud) 使用这样的数据帧,
rdd_2 = sc.parallelize([(0,10,223,"201601"), (0,10,83,"2016032"),(1,20,None,"201602"),(1,20,3003,"201601"), (1,20,None,"201603"), (2,40, 2321,"201601"), (2,30, 10,"201602"),(2,61, None,"201601")])
df_data = sqlContext.createDataFrame(rdd_2, ["id", "type", "cost", "date"])
df_data.show()
+---+----+----+-------+
| id|type|cost| date|
+---+----+----+-------+
| 0| 10| 223| 201601|
| 0| 10| 83|2016032|
| 1| 20|null| 201602|
| 1| 20|3003| 201601|
| 1| 20|null| 201603|
| 2| 40|2321| 201601|
| 2| 30| 10| 201602|
| 2| 61|null| 201601|
+---+----+----+-------+
Run Code Online (Sandbox Code Playgroud)
我需要用现有值的平均值填充空值,预期结果为
+---+----+----+-------+
| id|type|cost| date|
+---+----+----+-------+
| 0| 10| 223| 201601|
| 0| 10| 83|2016032|
| 1| 20|1128| 201602|
| 1| …Run Code Online (Sandbox Code Playgroud)