Omk*_*kar 7 java csv dataframe apache-spark apache-spark-sql
我spark job在一个有2个工作节点的集群中运行!我使用下面的代码(spark java)将计算出的数据帧保存为csv到工作节点.
dataframe.write().option("header","false").mode(SaveMode.Overwrite).csv(outputDirPath);
我试图了解spark如何在每个工作节点上写入多个部分文件.
Run1)worker1有part files和SUCCESS; worker2让_temporarty/task*/part*每个任务都运行部分文件.
Run2)worker1有部分文件和_temporary目录; worker2具有multiple part files
谁能帮助我理解为什么会出现这种行为?1)我是否应该将记录outputDir/_temporary作为输出文件的一部分与part files in outputDir?一起考虑?
2)_temporary 在作业运行后是否应该删除dir并将part文件移动到outputDir?
3)为什么不能直接在输出目录下创建零件文件?
coalesce(1)并且repartition(1)不能成为选项,因为outputDir文件本身就在附近500GB
Spark 2.0.2. 2.1.3 和 Java 8, no HDFS
经过分析,发现我的 Spark 作业正在使用fileoutputcommitter version 1默认的。fileoutputcommitter version 2然后我添加了要使用的配置,version 1并在 AWS 中的 10 节点 Spark 独立集群中进行了测试。都是part-* files直接在outputDirPath指定下生成的dataframe.write().option("header","false").mode(SaveMode.Overwrite).csv(outputDirPath)
我们可以设置属性
--conf 'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2'通过包含与中相同的内容spark-submit command
或使用sparkContext设置属性javaSparkContext.hadoopConifiguration().set("mapreduce.fileoutputcommitter.algorithm.version","2")
我理解Spark 文档中概述的失败情况的后果,但我达到了预期的结果!
Spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version,默认值为 1
文件输出提交者算法版本,有效算法版本号:1 或 2。版本 2 可能具有更好的性能,但版本 1 在某些情况下可能更好地处理故障,如根据 MAPREDUCE-4815。