lse*_*ohn 5 export save hdfs apache-spark apache-spark-sql
我有一个将数据帧写入文件的测试程序。数据帧是通过为每行添加序号来生成的,例如
1,2,3,4,5,6,7.....11
2,3,4,5,6,7,8.....12
......
Run Code Online (Sandbox Code Playgroud)
数据框中有 100000 行,但我认为它不是太大。当我提交 Spark 任务时,将数据帧写入 HDFS 上的文件大约需要 20 分钟。我想知道为什么这么慢,以及如何提高性能。
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val numCol = 11
val arraydataInt = 1 to 100000 toArray
val arraydata = arraydataInt.map(x => x.toDouble)
val slideddata = arraydata.sliding(numCol).toSeq
val rows = arraydata.sliding(numCol).map { x => Row(x: _*) }
val datasetsize = arraydataInt.size
val myrdd = sc.makeRDD(rows.toSeq, arraydata.size - numCol).persist()
val schemaString = "value1 value2 value3 value4 value5 " +
"value6 value7 value8 value9 value10 label"
val schema =
StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, DoubleType, true)))
val df = sqlContext.createDataFrame(myrdd, schema).cache()
val splitsH = df.randomSplit(Array(0.8, 0.1))
val trainsetH = splitsH(0).cache()
val testsetH = splitsH(1).cache()
println("now saving training and test samples into files")
trainsetH.write.save("TrainingSample.parquet")
testsetH.write.save("TestSample.parquet")
Run Code Online (Sandbox Code Playgroud)
转动
val myrdd = sc.makeRDD(rows.toSeq, arraydata.size - numCol).persist()
Run Code Online (Sandbox Code Playgroud)
到
val myrdd = sc.makeRDD(rows.toSeq, 100).persist()
Run Code Online (Sandbox Code Playgroud)
您已经创建了一个带有分区的 rdd arraydata.size - numCol,每个分区都会导致一个需要额外运行时间的任务。一般来说,分区的数量是并行级别和额外成本之间的权衡。尝试 100 个分区,效果应该会好得多。
顺便说一句,官方指南建议将此数字设置为集群中 CPU 数量的 2 或 3 倍。
| 归档时间: |
|
| 查看次数: |
7354 次 |
| 最近记录: |