对于spark sql,我们应该如何从HDFS中的一个文件夹中获取数据,进行一些修改,并通过覆盖保存模式将更新的数据保存到HDFS中的同一文件夹而不会得到FileNotFoundException?
import org.apache.spark.sql.{SparkSession,SaveMode}
import org.apache.spark.SparkConf
val sparkConf: SparkConf = new SparkConf()
val sparkSession = SparkSession.builder.config(sparkConf).getOrCreate()
val df = sparkSession.read.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20")
val newDF = df.select("a","b","c")
newDF.write.mode(SaveMode.Overwrite)
.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20") // doesn't work
newDF.write.mode(SaveMode.Overwrite)
.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-21") // works
Run Code Online (Sandbox Code Playgroud)
当我们从hdfs目录"d = 2017-03-20"读取数据时,会发生FileNotFoundException,并将(SaveMode.Overwrite)更新的数据保存到相同的hdfs目录"d = 2017-03-20"
Caused by: org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20/part-05020-35ea100f-829e-43d9-9003061-1788904de770.snappy.parquet
It is possible the underlying files …Run Code Online (Sandbox Code Playgroud) 我在pom.xml中设置了Apache Spark maven依赖,如下所示
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>0.9.1</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
但我发现这个依赖使用" hadoop-client-1.0.4.jar "和" hadoop-core-1.0.4.jar ",当我运行我的程序时,我收到错误" org.apache.hadoop.ipc. RemoteException:服务器IPC版本9无法与客户端版本4 " 通信,这表明我需要将hadoop版本从1.0.4切换到2.2.0.
更新:
以下解决方案是解决此问题的正确方法吗?
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>0.9.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.2.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
非常感谢您的帮助.