如何将PySpark中的表数据框导出到csv?

PyR*_*red 59 python dataframe export-to-csv apache-spark apache-spark-sql

我正在使用Spark 1.3.1(PySpark),我使用SQL查询生成了一个表.我现在有一个对象DataFrame.我想将此DataFrame对象(我将其称为"表")导出到csv文件,以便我可以操作它并绘制列.如何将DataFrame"表" 导出到csv文件?

谢谢!

zer*_*323 141

如果数据框适合驱动程序内存并且您想要保存到本地文件系统,则可以使用方法将Spark DataFrame转换为本地Pandas DataFrametoPandas,然后只需使用to_csv:

df.toPandas().to_csv('mycsv.csv')
Run Code Online (Sandbox Code Playgroud)

否则你可以使用spark-csv:

在Spark 2.0+中,您可以csv直接使用数据源:

df.write.csv('mycsv.csv')
Run Code Online (Sandbox Code Playgroud)

  • 如果你坚持使用单个输出文件,你应该可以使用`df.coalesce(1).write.csv('mycsv.csv')` (12认同)
  • 如果你有spark数据帧你可以使用`df.write.csv('/ tmp/lookatme /')`这将在`/ tmp/lookatme`中删除一组csv文件使用spark比在pandas中序列化要快得多.唯一的缺点是你最终会得到一组csv而不是一个csvs,如果目标工具不知道如何连接它们,你需要自己做. (6认同)
  • 使用 ``df.write.csv('mycsv.csv')``` 将 csv 导出到 hdfs 环境。我如何在本地环境中获取它? (2认同)

Sha*_*fiq 29

对于Apache Spark 2+,为了将数据帧保存到单个csv文件中.使用以下命令

query.repartition(1).write.csv("cc_out.csv", sep='|')
Run Code Online (Sandbox Code Playgroud)

1表明我只需要一个csv分区.你可以根据你的要求改变它.

  • 如下所示:https://spark.apache.org/docs/2.2.0/api/python/pyspark.html#pyspark.RDD.repartition建议使用coalesce()而不是repartition()来提高性能( "如果要减少此RDD中的分区数,请考虑使用coalesce,这可以避免执行shuffle.") (4认同)
  • @Seastar:虽然合并可能在多个用例中具有优势,但您的评论不适用于这种特殊情况。如果您想在 hdfs(或其他)中拥有一个 .csv,您通常需要一个文件,而不是分布在集群中的数十个文件(执行“repartition(1)”的全部意义。您需要对数据进行混洗无论哪种方式,合并在更大的范围内都没有任何帮助。 (3认同)

jbo*_*chi 17

如果您不能使用spark-csv,您可以执行以下操作:

df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("file.csv")
Run Code Online (Sandbox Code Playgroud)

如果您需要处理带有无法使用的换行符或逗号的字符串.用这个:

import csv
import cStringIO

def row2csv(row):
    buffer = cStringIO.StringIO()
    writer = csv.writer(buffer)
    writer.writerow([str(s).encode("utf-8") for s in row])
    buffer.seek(0)
    return buffer.read().strip()

df.rdd.map(row2csv).coalesce(1).saveAsTextFile("file.csv")
Run Code Online (Sandbox Code Playgroud)


s51*_*510 9

使用 PySpark

Spark 3.0+ 中写入 csv 的最简单方法

sdf.write.csv("/path/to/csv/data.csv")
Run Code Online (Sandbox Code Playgroud)

这可以根据您正在使用的 Spark 节点的数量生成多个文件。如果您想将其放在单个文件中,请使用重新分区。

sdf.repartition(1).write.csv("/path/to/csv/data.csv")
Run Code Online (Sandbox Code Playgroud)

使用熊猫

如果你的数据不是太多并且可以保存在本地python中,那么你也可以使用pandas

sdf.toPandas().to_csv("/path/to/csv/data.csv", index=False)
Run Code Online (Sandbox Code Playgroud)

使用考拉

sdf.to_koalas().to_csv("/path/to/csv/data.csv", index=False)
Run Code Online (Sandbox Code Playgroud)


Gaz*_*tel 7

您需要将Dataframe重新划分为一个分区,然后以Unix文件系统格式定义文件的格式,路径和其他参数,然后就可以开始了,

df.repartition(1).write.format('com.databricks.spark.csv').save("/path/to/file/myfile.csv",header = 'true')
Run Code Online (Sandbox Code Playgroud)

阅读有关重新分区功能的 更多信息阅读有关保存功能的更多信息

但是,重新分区是一项代价高昂的函数,并且toPandas()最糟糕。尝试在以前的语法中使用.coalesce(1)代替.repartition(1)以获得更好的性能。

阅读有关分区功能与合并功能的更多信息