将Spark数据帧写为带分区的CSV

Lio*_*ber 10 csv partitioning apache-spark apache-spark-sql

我正在尝试将一个数据帧写入到HDFS位置的spark中,我希望如果我添加partitionBy符号Spark将创建分区(类似于以Parquet格式编写)文件夹的形式

partition_column_name=partition_value
Run Code Online (Sandbox Code Playgroud)

(即partition_date=2016-05-03).为此,我运行了以下命令:

(df.write
    .partitionBy('partition_date')
    .mode('overwrite')
    .format("com.databricks.spark.csv")
    .save('/tmp/af_organic'))
Run Code Online (Sandbox Code Playgroud)

但是没有创建分区文件夹,知道我为了火花DF自动创建那些文件夹我应该做些什么?

谢谢,

zer*_*323 22

Spark 2.0.0+:

内置的csv格式支持开箱即用的分区,因此您应该能够简单地使用:

df.write.partitionBy('partition_date').mode(mode).format("csv").save(path)
Run Code Online (Sandbox Code Playgroud)

不包括任何额外的包.

Spark <2.0.0:

此时(v1.4.0)spark-csv不支持partitionBy(请参阅databricks/spark-csv#123),但您可以调整内置源以实现您想要的效果.

您可以尝试两种不同的方法.假设您的数据相对简单(没有复杂的字符串并且需要字符转义)并且看起来或多或少像这样:

df = sc.parallelize([
    ("foo", 1, 2.0, 4.0), ("bar", -1, 3.5, -0.1)
]).toDF(["k", "x1", "x2", "x3"])
Run Code Online (Sandbox Code Playgroud)

您可以手动准备写入值:

from pyspark.sql.functions import col, concat_ws

key = col("k")
values = concat_ws(",", *[col(x) for x in df.columns[1:]])

kvs = df.select(key, values)
Run Code Online (Sandbox Code Playgroud)

并使用textsource 编写

kvs.write.partitionBy("k").text("/tmp/foo")

df_foo = (sqlContext.read.format("com.databricks.spark.csv")
    .options(inferSchema="true")
    .load("/tmp/foo/k=foo"))

df_foo.printSchema()
## root
## |-- C0: integer (nullable = true)
## |-- C1: double (nullable = true)
## |-- C2: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)

在更复杂的情况下,您可以尝试使用适当的CSV解析器以类似的方式预处理值,可以使用UDF或通过RDD进行映射,但这将显着更昂贵.

如果CSV格式不是硬盘要求,您也可以使用支持partitionBy开箱即用的JSON编写器:

df.write.partitionBy("k").json("/tmp/bar")
Run Code Online (Sandbox Code Playgroud)

以及读取时的分区发现.