使用spark-csv编写单个CSV文件

use*_*076 92 csv scala apache-spark spark-csv

我正在使用https://github.com/databricks/spark-csv,我正在尝试编写单个CSV,但不能,它正在创建一个文件夹.

需要一个Scala函数,它将获取路径和文件名等参数并写入该CSV文件.

zer*_*323 146

它正在创建一个包含多个文件的文件夹,因为每个分区都是单独保存的.如果您需要单个输出文件(仍然在文件夹中),您可以repartition(如果上游数据很大,但需要随机播放,则首选):

df
   .repartition(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")
Run Code Online (Sandbox Code Playgroud)

或者coalesce:

df
   .coalesce(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")
Run Code Online (Sandbox Code Playgroud)

保存前的数据框:

所有数据都将被写入mydata.csv/part-00000.在使用此选项之前,请确保了解正在进行的操作以及将所有数据传输到单个工作程序的成本是多少.如果将分布式文件系统与复制一起使用,则会多次传输数据 - 首先将其提取到单个工作线程,然后再分布在存储节点上.

或者您可以留下您的代码,因为它是和使用像通用工具catHDFSgetmerge简单地合并之后的所有部件.

  • 你也可以使用coalesce:df.coalesce(1).write.format("com.databricks.spark.csv").option("header","true").save("mydata.csv") (5认同)
  • @Harsha我不说没有.如果你正确调整GC它应该工作得很好,但这只是浪费时间,很可能会损害整体性能.所以我个人认为没有任何理由特别困扰,因为在Spark之外合并文件并不用担心内存使用情况非常简单. (2认同)

Min*_*gan 34

如果您使用HDFS运行Spark,我一直在通过正常编写csv文件并利用HDFS进行合并来解决问题.我直接在Spark(1.6)中这样做:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}


val newData = << create your dataframe >>


val outputfile = "/user/feeds/project/outputs/subject"  
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename 
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob  = outputFileName

    newData.write
        .format("com.databricks.spark.csv")
        .option("header", "false")
        .mode("overwrite")
        .save(outputFileName)
    merge(mergeFindGlob, mergedFileName )
    newData.unpersist()
Run Code Online (Sandbox Code Playgroud)

不记得我在哪里学到了这个技巧,但它可能适合你.

  • @SUDARSHAN我的上述函数适用于未压缩的数据.在您的示例中,我认为您在编写文件时使用gzip压缩 - 然后 - 尝试合并这些失败的文件.这不会起作用,因为您无法将gzip文件合并在一起.Gzip不是可拆分压缩算法,所以肯定不是"合法".您可能会测试"snappy"或"bz2"压缩 - 但直觉感觉这在合并时也会失败.可能最好的方法是删除压缩,合并原始文件,然后使用可拆分编解码器进行压缩. (4认同)
  • 这可以与S3数据存储一起使用吗? (2认同)

小智 26

我可能在这里玩游戏有点晚了,但是使用coalesce(1)repartition(1)可能适用于小型数据集,但是大型数据集将全部投入到一个节点上的一个分区中.这可能会导致OOM错误,或者充其量只会缓慢进行.

我强烈建议您使用FileUtil.copyMerge()Hadoop API中的函数.这会将输出合并为一个文件.

编辑 - 这有效地将数据带到驱动程序而不是执行程序节点.Coalesce()如果一个执行器有比驱动程序更多的RAM使用,那就没问题.

编辑2:copyMerge()正在Hadoop 3.0中删除.有关如何使用最新版本的更多信息,请参阅以下堆栈溢出文章:Hadoop如何在Hadoop 3.0中执行CopyMerge


Jos*_*der 15

如果您正在使用Databricks并且可以将所有数据放入一个工作线程的RAM中(因此可以使用.coalesce(1)),则可以使用dbfs查找并移动生成的CSV文件:

val fileprefix= "/mnt/aws/path/file-prefix"

dataset
  .coalesce(1)       
  .write             
//.mode("overwrite") // I usually don't use this, but you may want to.
  .option("header", "true")
  .option("delimiter","\t")
  .csv(fileprefix+".tmp")

val partition_path = dbutils.fs.ls(fileprefix+".tmp/")
     .filter(file=>file.name.endsWith(".csv"))(0).path

dbutils.fs.cp(partition_path,fileprefix+".tab")

dbutils.fs.rm(fileprefix+".tmp",recurse=true)
Run Code Online (Sandbox Code Playgroud)

如果您的文件不适合工作者的RAM,您可能需要考虑使用 chaotic3quilibrium的建议来使用FileUtils.copyMerge().我没有这样做,并且还不知道是否可能,例如,在S3上.

这个答案建立在此问题的先前答案以及我自己对提供的代码片段的测试之上.我最初将它发布到Databricks并在此重新发布.

我发现dbfs的rm递归选项的最佳文档是在Databricks论坛上.


ppr*_*009 12

spark 的df.write()API 将在给定路径内创建多个部分文件......强制 spark 只写入一个部分文件使用df.coalesce(1).write.csv(...)而不是df.repartition(1).write.csv(...)作为合并是一个狭窄的转换,而重新分区是一个广泛的转换,请参阅Spark - repartition() 与 coalesce()

df.coalesce(1).write.csv(filepath,header=True) 
Run Code Online (Sandbox Code Playgroud)

将使用一个part-0001-...-c000.csv文件在给定的文件路径中创建文件夹

cat filepath/part-0001-...-c000.csv > filename_you_want.csv 
Run Code Online (Sandbox Code Playgroud)

有一个用户友好的文件名

  • 或者,如果数据帧不太大(~GB 或可以容纳驱动程序内存),您也可以使用“df.toPandas().to_csv(path)”,这将使用您首选的文件名写入单个 csv (4认同)
  • 呃,太令人沮丧了,这只能通过转换为熊猫来完成。只编写一个没有 UUID 的文件有多难? (3认同)

Pow*_*ers 12

此答案扩展了已接受的答案,提供了更多上下文,并提供了可以在计算机上的 Spark Shell 中运行的代码片段。

有关已接受答案的更多背景信息

接受的答案可能会给您留下示例代码输出单个mydata.csv文件的印象,但事实并非如此。我们来演示一下:

val df = Seq("one", "two", "three").toDF("num")
df
  .repartition(1)
  .write.csv(sys.env("HOME")+ "/Documents/tmp/mydata.csv")
Run Code Online (Sandbox Code Playgroud)

这是输出的内容:

Documents/
  tmp/
    mydata.csv/
      _SUCCESS
      part-00000-b3700504-e58b-4552-880b-e7b52c60157e-c000.csv
Run Code Online (Sandbox Code Playgroud)

NBmydata.csv是接受答案中的一个文件夹 - 它不是一个文件!

如何输出具有特定名称的单个文件

我们可以使用spark-daria写出单个mydata.csv文件。

Documents/
  tmp/
    mydata.csv/
      _SUCCESS
      part-00000-b3700504-e58b-4552-880b-e7b52c60157e-c000.csv
Run Code Online (Sandbox Code Playgroud)

这将输出文件如下:

Documents/
  better/
    mydata.csv
Run Code Online (Sandbox Code Playgroud)

S3 路径

您需要传递 s3a 路径才能DariaWriters.writeSingleFile在 S3 中使用此方法:

import com.github.mrpowers.spark.daria.sql.DariaWriters
DariaWriters.writeSingleFile(
    df = df,
    format = "csv",
    sc = spark.sparkContext,
    tmpFolder = sys.env("HOME") + "/Documents/better/staging",
    filename = sys.env("HOME") + "/Documents/better/mydata.csv"
)
Run Code Online (Sandbox Code Playgroud)

请参阅此处了解更多信息。

避免复制合并

copyMerge 已从 Hadoop 3 中删除。DariaWriters.writeSingleFile实现使用fs.rename如此处所述Spark 3 仍然使用 Hadoop 2,因此 copyMerge 实现将在 2020 年起作用。我不确定 Spark 何时升级到 Hadoop 3,但最好避免任何 copyMerge 方法,因为它会在 Spark 升级 Hadoop 时导致代码中断。

源代码

DariaWriters如果您想检查实现,请在 Spark-daria 源代码中 查找该对象。

PySpark 实施

使用 PySpark 写出单个文件更容易,因为您可以将 DataFrame 转换为默认情况下作为单个文件写出的 Pandas DataFrame。

Documents/
  better/
    mydata.csv
Run Code Online (Sandbox Code Playgroud)

局限性

ScalaDariaWriters.writeSingleFile方法和df.toPandas()Python 方法仅适用于小型数据集。巨大的数据集无法写成单个文件。从性能角度来看,将数据写为单个文件并不是最佳选择,因为数据无法并行写入。


小智 5

一个适用于 S3 的解决方案,由 Minkymorgan 修改而来。

只需将临时分区目录路径(名称与最终路径不同)传递为 ,并将srcPath单个最终 csv/txt 传递为destPath 指定deleteSource是否要删除原始目录。

/**
* Merges multiple partitions of spark text file output into single file. 
* @param srcPath source directory of partitioned files
* @param dstPath output path of individual path
* @param deleteSource whether or not to delete source directory after merging
* @param spark sparkSession
*/
def mergeTextFiles(srcPath: String, dstPath: String, deleteSource: Boolean): Unit =  {
  import org.apache.hadoop.fs.FileUtil
  import java.net.URI
  val config = spark.sparkContext.hadoopConfiguration
  val fs: FileSystem = FileSystem.get(new URI(srcPath), config)
  FileUtil.copyMerge(
    fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, config, null
  )
}
Run Code Online (Sandbox Code Playgroud)


Kee*_*ker 5

我在 Python 中使用它来获取单个文件:

df.toPandas().to_csv("/tmp/my.csv", sep=',', header=True, index=False)
Run Code Online (Sandbox Code Playgroud)

  • 对于较小的数据,它就像一个魅力 :-D 并且你的文件不是奇怪的格式 :D (2认同)