使用单个标头合并Spark输出CSV文件

V. *_*mma 21 csv hadoop scala apache-spark

我想在AWS中创建一个数据处理管道,最终将处理后的数据用于机器学习.

我有一个Scala脚本,它从S3获取原始数据,处理它并使用Spark-CSV将其写入HDFS甚至S3 .如果我想使用AWS Machine Learning工具来训练预测模型,我想我可以使用多个文件作为输入.但是如果我想使用别的东西,我认为最好是收到一个CSV输出文件.

目前,因为我不希望使用的重新分配(1) ,也不合并(1)用于提高性能的目的,我已经使用了Hadoop的FS -getmerge手动测试,但它只是合并作业输出文件的内容,我遇到了一个小问题.我需要在数据文件中使用单行标题来训练预测模型.

如果我使用.option("header","true")spark-csv,那么它会将标头写入每个输出文件,并且在合并之后我在数据中有与输出文件一样多的标题行.但是如果header选项为false,则它不会添加任何标头.

现在我找到了一个选项,可以将Scala脚本中的文件与Hadoop API合并FileUtil.copyMerge.我尝试spark-shell使用下面的代码.

import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
val configuration = new Configuration();
val fs = FileSystem.get(configuration);
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")
Run Code Online (Sandbox Code Playgroud)

但是这个解决方案仍然只是将文件连接在一起,并且不处理头文件.如何获得只有一行标题的输出文件?

我甚至尝试添加df.columns.mkString(",")作为最后一个参数copyMerge,但这仍然多次添加标题,而不是一次.

小智 6

你可以像这样四处走动。

  • 1.创建一个包含标题名称的新 DataFrame(headerDF)。
  • 2.将它与包含数据的 DataFrame(dataDF) 联合起来。
  • 3.使用option("header", "false") 将合并后的 DataFrame 输出到磁盘。
  • 4.使用hadoop FileUtil合并分区文件(part-0000**0.csv)

通过这种方式,除了单个分区的内容具有来自 headerDF 的一行标题名称之外,所有分区都没有标题。当所有分区合并在一起时,文件顶部有一个标题。示例代码如下

  //dataFrame is the data to save on disk
  //cast types of all columns to String
  val dataDF = dataFrame.select(dataFrame.columns.map(c => dataFrame.col(c).cast("string")): _*)

  //create a new data frame containing only header names
  import scala.collection.JavaConverters._
  val headerDF = sparkSession.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)

  //merge header names with data
  headerDF.union(dataDF).write.mode(SaveMode.Overwrite).option("header", "false").csv(outputFolder)

  //use hadoop FileUtil to merge all partition csv files into a single file
  val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
  FileUtil.copyMerge(fs, new Path(outputFolder), fs, new Path("/folder/target.csv"), true, spark.sparkContext.hadoopConfiguration, null)
Run Code Online (Sandbox Code Playgroud)


Kra*_*tam -4

 // Convert JavaRDD  to CSV and save as text file
        outputDataframe.write()
                .format("com.databricks.spark.csv")
                // Header => true, will enable to have header in each file
                .option("header", "true")
Run Code Online (Sandbox Code Playgroud)

请点击集成测试链接,了解如何编写单个标头

http://bytepadding.com/big-data/spark/write-a-csv-text-file-from-spark/