V. *_*mma 21 csv hadoop scala apache-spark
我想在AWS中创建一个数据处理管道,最终将处理后的数据用于机器学习.
我有一个Scala脚本,它从S3获取原始数据,处理它并使用Spark-CSV将其写入HDFS甚至S3 .如果我想使用AWS Machine Learning工具来训练预测模型,我想我可以使用多个文件作为输入.但是如果我想使用别的东西,我认为最好是收到一个CSV输出文件.
目前,因为我不希望使用的重新分配(1) ,也不合并(1)用于提高性能的目的,我已经使用了Hadoop的FS -getmerge手动测试,但它只是合并作业输出文件的内容,我遇到了一个小问题.我需要在数据文件中使用单行标题来训练预测模型.
如果我使用.option("header","true")
spark-csv,那么它会将标头写入每个输出文件,并且在合并之后我在数据中有与输出文件一样多的标题行.但是如果header选项为false,则它不会添加任何标头.
现在我找到了一个选项,可以将Scala脚本中的文件与Hadoop API合并FileUtil.copyMerge
.我尝试spark-shell
使用下面的代码.
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
val configuration = new Configuration();
val fs = FileSystem.get(configuration);
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")
Run Code Online (Sandbox Code Playgroud)
但是这个解决方案仍然只是将文件连接在一起,并且不处理头文件.如何获得只有一行标题的输出文件?
我甚至尝试添加df.columns.mkString(",")
作为最后一个参数copyMerge
,但这仍然多次添加标题,而不是一次.
小智 6
你可以像这样四处走动。
通过这种方式,除了单个分区的内容具有来自 headerDF 的一行标题名称之外,所有分区都没有标题。当所有分区合并在一起时,文件顶部有一个标题。示例代码如下
//dataFrame is the data to save on disk
//cast types of all columns to String
val dataDF = dataFrame.select(dataFrame.columns.map(c => dataFrame.col(c).cast("string")): _*)
//create a new data frame containing only header names
import scala.collection.JavaConverters._
val headerDF = sparkSession.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)
//merge header names with data
headerDF.union(dataDF).write.mode(SaveMode.Overwrite).option("header", "false").csv(outputFolder)
//use hadoop FileUtil to merge all partition csv files into a single file
val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
FileUtil.copyMerge(fs, new Path(outputFolder), fs, new Path("/folder/target.csv"), true, spark.sparkContext.hadoopConfiguration, null)
Run Code Online (Sandbox Code Playgroud)
Kra*_*tam -4
// Convert JavaRDD to CSV and save as text file
outputDataframe.write()
.format("com.databricks.spark.csv")
// Header => true, will enable to have header in each file
.option("header", "true")
Run Code Online (Sandbox Code Playgroud)
请点击集成测试链接,了解如何编写单个标头
http://bytepadding.com/big-data/spark/write-a-csv-text-file-from-spark/