从hive表中读取并使用spark sql写回来

Avi*_*Avi 8 hadoop scala apache-spark apache-spark-sql spark-dataframe

我正在使用Spark SQL读取Hive表并将其分配给scala val

val x = sqlContext.sql("select * from some_table")
Run Code Online (Sandbox Code Playgroud)

然后我正在使用数据帧x进行一些处理,最后得到一个数据帧y,它具有与表some_table一样的精确模式.

最后,我试图将y数据帧覆盖到同一个hive表some_table

y.write.mode(SaveMode.Overwrite).saveAsTable().insertInto("some_table")
Run Code Online (Sandbox Code Playgroud)

然后我收到错误

org.apache.spark.sql.AnalysisException:无法将覆盖插入到也从中读取的表中

我尝试创建一个insert sql语句并使用sqlContext.sql()触发它,但它也给了我同样的错误.

有什么办法可以绕过这个错误吗?我需要将记录插回到同一个表中.


嗨,我尝试按照建议做,但仍然得到相同的错误.

val x = sqlContext.sql("select * from incremental.test2")
val y = x.limit(5)
y.registerTempTable("temp_table")
val dy = sqlContext.table("temp_table")
dy.write.mode("overwrite").insertInto("incremental.test2")

scala> dy.write.mode("overwrite").insertInto("incremental.test2")
             org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from.;
Run Code Online (Sandbox Code Playgroud)

che*_*aux 7

您应该首先将DataFrame保存y在临时表中

y.write.mode("overwrite").saveAsTable("temp_table")
Run Code Online (Sandbox Code Playgroud)

然后,您可以覆盖目标表中的行

val dy = sqlContext.table("temp_table")
dy.write.mode("overwrite").insertInto("some_table")
Run Code Online (Sandbox Code Playgroud)

  • 是的,我已经用相同的逻辑处理了这种情况,但我觉得它不具有成本效益。您强调的一点,即写入磁盘,是我想跳过的部分。无论如何,感谢您的帮助@cheseaux。如果您在这方面找到了好的东西,请告诉我。干杯!! (2认同)

nsa*_*lar 5

实际上你也可以使用检查点来实现这一目标.由于它打破了数据沿袭,Spark无法检测到您正在同一个表中读取和覆盖:

 sqlContext.sparkContext.setCheckpointDir(checkpointDir)
 val ds = sqlContext.sql("select * from some_table").checkpoint()
 ds.write.mode("overwrite").saveAsTable("some_table")
Run Code Online (Sandbox Code Playgroud)