我们正在运行spark 2.3.0 AWW EMR.以下DataFrame" df"非空且大小适中:
scala> df.count
res0: Long = 4067
Run Code Online (Sandbox Code Playgroud)
下面的代码工作正常写df到hdfs:
scala> val hdf = spark.read.parquet("/tmp/topVendors")
hdf: org.apache.spark.sql.DataFrame = [displayName: string, cnt: bigint]
scala> hdf.count
res4: Long = 4067
Run Code Online (Sandbox Code Playgroud)
但是,使用相同的代码写入本地parquet或csv文件最终结果为空:
df.repartition(1).write.mode("overwrite").parquet("file:///tmp/topVendors")
scala> val locdf = spark.read.parquet("file:///tmp/topVendors")
org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$9.apply(DataSource.scala:207)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$9.apply(DataSource.scala:207)
at scala.Option.getOrElse(Option.scala:121)
Run Code Online (Sandbox Code Playgroud)
我们可以看到它失败的原因:
ls -l /tmp/topVendors
total 0
-rw-r--r-- 1 hadoop hadoop 0 Jul 30 …Run Code Online (Sandbox Code Playgroud) 我spark job在一个有2个工作节点的集群中运行!我使用下面的代码(spark java)将计算出的数据帧保存为csv到工作节点.
dataframe.write().option("header","false").mode(SaveMode.Overwrite).csv(outputDirPath);
我试图了解spark如何在每个工作节点上写入多个部分文件.
Run1)worker1有part files和SUCCESS; worker2让_temporarty/task*/part*每个任务都运行部分文件.
Run2)worker1有部分文件和_temporary目录; worker2具有multiple part files
谁能帮助我理解为什么会出现这种行为?1)我是否应该将记录outputDir/_temporary作为输出文件的一部分与part files in outputDir?一起考虑?
2)_temporary 在作业运行后是否应该删除dir并将part文件移动到outputDir?
3)为什么不能直接在输出目录下创建零件文件?
coalesce(1)并且repartition(1)不能成为选项,因为outputDir文件本身就在附近500GB
Spark 2.0.2. 2.1.3 和 Java 8, no HDFS