如何将数据从Spark SQL导出到CSV

sha*_*nkS 42 hadoop export-to-csv hiveql apache-spark apache-spark-sql

此命令适用于HiveQL:

insert overwrite directory '/data/home.csv' select * from testtable;
Run Code Online (Sandbox Code Playgroud)

但是使用Spark SQL我收到了一个org.apache.spark.sql.hive.HiveQl堆栈跟踪错误:

java.lang.RuntimeException: Unsupported language features in query:
    insert overwrite directory '/data/home.csv' select * from testtable
Run Code Online (Sandbox Code Playgroud)

请指导我在Spark SQL中编写导出到CSV功能.

sag*_*sag 74

您可以使用以下语句以CSV格式写入数据帧的内容 df.write.csv("/data/home/csv")

如果需要将整个数据帧写入单个CSV文件,请使用 df.coalesce(1).write.csv("/data/home/sample.csv")

对于spark 1.x,您可以使用spark-csv将结果写入CSV文件

scala片段下面会有所帮助

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.write.format("com.databricks.spark.csv").save("/data/home/csv")
Run Code Online (Sandbox Code Playgroud)

将内容写入单个文件

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv")
Run Code Online (Sandbox Code Playgroud)

  • 我试过你提到的合并的东西。它在指定路径上创建一个目录,其中包含一个“part”文件和一个名为“_SUCCESS”的文件。您知道一种实际上只获取一个文件的方法吗? (2认同)

Boe*_*ern 46

由于Spark 2.X spark-csv被集成为本机数据源.因此,必要的声明简化为(windows)

df.write
  .option("header", "true")
  .csv("file:///C:/out.csv")
Run Code Online (Sandbox Code Playgroud)

或UNIX

df.write
  .option("header", "true")
  .csv("/var/out.csv")
Run Code Online (Sandbox Code Playgroud)

  • 好的!`.mode( "覆盖").CSV( "/ VAR/out.csv")` (4认同)
  • 这应该是现在接受的答案. (3认同)
  • 在Spark 2.x中,它使用该名称创建目录。有什么帮助吗? (2认同)

Dmi*_*rov 29

上面使用spark-csv的答案是正确的,但是有一个问题 - 库根据数据帧分区创建了几个文件.这不是我们通常需要的.因此,您可以将所有分区合并为一个:

df.coalesce(1).
    write.
    format("com.databricks.spark.csv").
    option("header", "true").
    save("myfile.csv")
Run Code Online (Sandbox Code Playgroud)

并将lib的输出(名称"part-00000")重命名为所需的文件名.

此博客文章提供了更多详细信息:https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/

  • `coalesce(1)`要求数据集适合单个机器的堆,并且在处理大型数据集时很可能会导致问题 (5认同)
  • 它应该是df.repartition.write而不是df.write.repartition吗? (2认同)
  • 如果希望继续写入现有文件,也可以添加模型.`resultDF.repartition(1).write.mode("append").format("com.databricks.spark.csv").option("header","true").save("s3:// .. ")`. (2认同)

Arn*_*-Oz 9

最简单的方法是映射DataFrame的RDD并使用mkString:

  df.rdd.map(x=>x.mkString(","))
Run Code Online (Sandbox Code Playgroud)

从Spark 1.5开始(或甚至在此之前) df.map(r=>r.mkString(",")),如果你想要CSV转义,你可以使用apache commons lang.例如,这是我们正在使用的代码

 def DfToTextFile(path: String,
                   df: DataFrame,
                   delimiter: String = ",",
                   csvEscape: Boolean = true,
                   partitions: Int = 1,
                   compress: Boolean = true,
                   header: Option[String] = None,
                   maxColumnLength: Option[Int] = None) = {

    def trimColumnLength(c: String) = {
      val col = maxColumnLength match {
        case None => c
        case Some(len: Int) => c.take(len)
      }
      if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
    }
    def rowToString(r: Row) = {
      val st = r.mkString("~-~").replaceAll("[\\p{C}|\\uFFFD]", "") //remove control characters
      st.split("~-~").map(trimColumnLength).mkString(delimiter)
    }

    def addHeader(r: RDD[String]) = {
      val rdd = for (h <- header;
                     if partitions == 1; //headers only supported for single partitions
                     tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
      rdd.getOrElse(r)
    }

    val rdd = df.map(rowToString).repartition(partitions)
    val headerRdd = addHeader(rdd)

    if (compress)
      headerRdd.saveAsTextFile(path, classOf[GzipCodec])
    else
      headerRdd.saveAsTextFile(path)
  }
Run Code Online (Sandbox Code Playgroud)

  • 虽然这是最简单的答案(也是一个很好的答案),如果你的文字有双引号,你就必须考虑它们. (2认同)