Spark SQL SaveMode.Overwrite,获取java.io.FileNotFoundException并要求'REFRESH TABLE tableName'

fau*_*sun 11 apache-spark apache-spark-sql spark-dataframe

对于spark sql,我们应该如何从HDFS中的一个文件夹中获取数据,进行一些修改,并通过覆盖保存模式将更新的数据保存到HDFS中的同一文件夹而不会得到FileNotFoundException?

import org.apache.spark.sql.{SparkSession,SaveMode}
import org.apache.spark.SparkConf

val sparkConf: SparkConf = new SparkConf()
val sparkSession = SparkSession.builder.config(sparkConf).getOrCreate()
val df = sparkSession.read.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20")
val newDF = df.select("a","b","c")

newDF.write.mode(SaveMode.Overwrite)
     .parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20") // doesn't work
newDF.write.mode(SaveMode.Overwrite)
     .parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-21") // works
Run Code Online (Sandbox Code Playgroud)

当我们从hdfs目录"d = 2017-03-20"读取数据时,会发生FileNotFoundException,并将(SaveMode.Overwrite)更新的数据保存到相同的hdfs目录"d = 2017-03-20"

Caused by: org.apache.spark.SparkException: Task failed while writing rows
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:99)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20/part-05020-35ea100f-829e-43d9-9003061-1788904de770.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:157)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
  at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
  ... 8 more
Run Code Online (Sandbox Code Playgroud)

以下尝试仍然得到相同的错误,我应该如何通过使用spark sql解决这个问题?谢谢!

val hdfsDirPath = "hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20"

val df= sparkSession.read.parquet(hdfsDirPath)

val newdf = df
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)
Run Code Online (Sandbox Code Playgroud)

要么

val df= sparkSession.read.parquet(hdfsDirPath)
df.createOrReplaceTempView("orgtable")
sparkSession.sql("SELECT * from orgtable").createOrReplaceTempView("tmptable")

sparkSession.sql("TRUNCATE TABLE orgtable")
sparkSession.sql("INSERT INTO orgtable SELECT * FROM tmptable")

val newdf = sparkSession.sql("SELECT * FROM orgtable")
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)
Run Code Online (Sandbox Code Playgroud)

要么

val df= sparkSession.read.parquet(hdfsDirPath)
df.createOrReplaceTempView("orgtable")
sparkSession.sql("SELECT * from orgtable").createOrReplaceTempView("tmptable")

sparkSession.sql("REFRESH TABLE orgtable")
sparkSession.sql("ALTER VIEW tmptable RENAME TO orgtable")

val newdf = sparkSession.sql("SELECT * FROM orgtable")
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)
Run Code Online (Sandbox Code Playgroud)

小智 10

我解决了这个问题,首先我将我的Dataframe写入临时目录,然后删除我读取的源,并将temp目录重命名为source name.QAQ


uh_*_*boi 5

为什么不阅读后就缓存它呢?将其保存到另一个文件目录,然后再移动目录可能需要一些额外的权限。我也一直在强迫动作,例如show()。

val myDF = spark.read.format("csv")
    .option("header", "false")
    .option("delimiter", ",")
    .load("/directory/tofile/")


myDF.cache()
myDF.show(2)
Run Code Online (Sandbox Code Playgroud)

  • “ cache()”是“ persist(StorageLevel.MEMORY_ONLY)”的别名,这对于大于群集内存的数据集可能不是很好。persist(StorageLevel.MEMORY_AND_DISK_ONLY)会是一个更好的解决方案,但是如果内存不足,它将保存数据到本地。我以前使用过“ persist”,但似乎没有用 (2认同)