我是Spark的新手.我有一个大的元素数据集[RDD],我想把它分成两个完全相同大小的分区,维护元素的顺序.我试着用RangePartitioner
像
var data = partitionedFile.partitionBy(new RangePartitioner(2, partitionedFile))
Run Code Online (Sandbox Code Playgroud)
这不能给出令人满意的结果,因为它大致分割但不完全相同的大小维持元素的顺序.例如,如果有64个元素,我们使用
Rangepartitioner
,然后它分为31个元素和33个元素.
我需要一个分区器,以便我在一半中获得前32个元素,而另一半包含第二组32个元素.你能否通过建议如何使用自定义分区器来帮助我,这样我可以获得相同大小的两半,保持元素的顺序?
当我尝试在RDD[(Int,ArrayBuffer[(Int,Double)])]
输入上应用方法(ComputeDwt)时,我面临异常.我甚至使用extends Serialization
选项来序列化spark中的对象.这是代码片段.
input:series:RDD[(Int,ArrayBuffer[(Int,Double)])]
DWTsample extends Serialization is a class having computeDwt function.
sc: sparkContext
val kk:RDD[(Int,List[Double])]=series.map(t=>(t._1,new DWTsample().computeDwt(sc,t._2)))
Error:
org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: org.apache.spark.SparkContext
org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: org.apache.spark.SparkContext
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
Run Code Online (Sandbox Code Playgroud)
任何人都可以建议我可能是什么问题以及应该采取什么措施来克服这个问题?