saveAsTable和insertInto在不同的SaveMode中有什么区别?

y2k*_*ham 13 scala apache-spark apache-spark-sql

我试图写一个DataFrame进入Hive表(上S3中)Overwrite模式(需要我的应用程序),并需要DataFrameWriter(火花/斯卡拉)的两种方法之间做出选择.从我可以中读取文档,df.write.saveAsTable不同于df.write.insertInto在以下几个方面:

  • saveAsTable在使用基于位置的分辨率insertInto使用基于列名称的分辨率
  • 在附加模式下,saveAsTable更多关注现有表的底层模式以进行某些分辨率

总体而言,它给我的印象是,saveAsTable仅仅是一个更聪明的版本insertInto.或者,根据用例,人们可能更喜欢insertInto

但是这些方法中的每一种都有自己的一些注意事项,例如性能损失saveAsTable(因为它包含更多功能)?除了在文档中讲述(不是非常清楚)的内容之外,他们的行为还有其他差异吗?


编辑-1

文档说明了这一点 insertInto

将DataFrame的内容插入到指定的表中

这个 saveAsTable

在表已存在的情况下,此函数的行为取决于模式函数指定的保存模式

现在我可以列出我的怀疑

  • 是否insertInto总是期望表存在吗?
  • 不要SaveMode■找什么影响insertInto
  • 如果以上答案是肯定的,那么
    • 什么之间的区别saveAsTableSaveMode.AppendinsertInto给该表已经存在?
    • insertIntoSaveMode.Overwrite任何意义?

Jac*_*ski 21

免责声明我已经探索insertInto了一段时间,虽然我离这个领域的专家很远,但我分享的结果更好.

是否insertInto总是期望表存在吗?

(根据表名和数据库).

此外,并非所有表都可以插入,即(永久)表,临时视图或临时全局视图都可以,但不是:

  1. 一张桌子

  2. 基于RDD的表

SaveModes对insertInto有什么影响吗?

(这也是我最近的问题!)

是的,但只有SaveMode.Overwrite.在考虑insertInto其他3种保存模式之后没有多大意义(因为它只是插入数据集).

saveAsTable与SaveMode.Append和insertInto之间的差异是什么,因为该表已经存在?

这是一个非常好的问题!我会说没有,但让我们看一个例子(希望证明一些东西).

scala> spark.version
res13: String = 2.4.0-SNAPSHOT

sql("create table my_table (id long)")
scala> spark.range(3).write.mode("append").saveAsTable("my_table")
org.apache.spark.sql.AnalysisException: The format of the existing table default.my_table is `HiveFileFormat`. It doesn't match the specified format `ParquetFileFormat`.;
  at org.apache.spark.sql.execution.datasources.PreprocessTableCreation$$anonfun$apply$2.applyOrElse(rules.scala:117)
  at org.apache.spark.sql.execution.datasources.PreprocessTableCreation$$anonfun$apply$2.applyOrElse(rules.scala:76)
...
scala> spark.range(3).write.insertInto("my_table")
scala> spark.table("my_table").show
+---+
| id|
+---+
|  2|
|  0|
|  1|
+---+
Run Code Online (Sandbox Code Playgroud)

insertInto和SaveMode.Overwrite有什么意义吗?

我认为这是因为它非常重视SaveMode.Overwrite.它只是重新创建目标表.

spark.range(3).write.mode("overwrite").insertInto("my_table")
scala> spark.table("my_table").show
+---+
| id|
+---+
|  1|
|  0|
|  2|
+---+

Seq(100, 200, 300).toDF.write.mode("overwrite").insertInto("my_table")
scala> spark.table("my_table").show
+---+
| id|
+---+
|200|
|100|
|300|
+---+
Run Code Online (Sandbox Code Playgroud)


San*_*hot 11

我想指出SPARKSaveAsTableinsertIntoSPARK之间的主要区别。

在分区表中overwriteSaveMode 在SaveAsTable和 的情况下工作不同insertInto

考虑下面的例子。我正在使用SaveAsTable方法创建分区表。

hive> CREATE TABLE `db.companies_table`(`company` string) PARTITIONED BY ( `id` date);
OK
Time taken: 0.094 seconds
Run Code Online (Sandbox Code Playgroud)
import org.apache.spark.sql._*
import spark.implicits._
import org.apache.spark.sql._

scala>val targetTable = "db.companies_table"

scala>val companiesDF = Seq(("2020-01-01", "Company1"), ("2020-01-02", "Company2")).toDF("id", "company")

scala>companiesDF.write.mode(SaveMode.Overwrite).partitionBy("id").saveAsTable(targetTable)

scala> spark.sql("select * from db.companies_table").show()
+--------+----------+
| company|        id|
+--------+----------+
|Company1|2020-01-01|
|Company2|2020-01-02|
+--------+----------+
Run Code Online (Sandbox Code Playgroud)

现在我要添加 2 个新行,其中包含 2 个新分区值。

scala> val companiesDF = Seq(("2020-01-03", "Company1"), ("2020-01-04", "Company2")).toDF("id", "company")

scala> companiesDF.write.mode(SaveMode.Append).partitionBy("id").saveAsTable(targetTable)

scala>spark.sql("select * from db.companies_table").show()

+--------+----------+                                                           
| company|        id|
+--------+----------+
|Company1|2020-01-01|
|Company2|2020-01-02|
|Company1|2020-01-03|
|Company2|2020-01-04|
+--------+----------+
Run Code Online (Sandbox Code Playgroud)

如您所见,表中添加了 2 个新行。

现在假设我想对Overwrite2020-01-02 数据进行分区。

scala> val companiesDF = Seq(("2020-01-02", "Company5")).toDF("id", "company")

scala>companiesDF.write.mode(SaveMode.Overwrite).partitionBy("id").saveAsTable(targetTable)
Run Code Online (Sandbox Code Playgroud)

按照我们的逻辑,只有分区 2020-01-02 应该被覆盖,但情况SaveAsTable不同。它会覆盖输入表,如下所示。

scala> spark.sql("select * from db.companies_table").show()
+-------+----------+
|company|        id|
+-------+----------+
|Company5|2020-01-02|
+-------+----------+
Run Code Online (Sandbox Code Playgroud)

因此,如果我们只想使用SaveAsTable它来覆盖表中的某些分区是不可能的。

有关更多详细信息,请参阅此链接。 https://towardsdatascience.com/understanding-the-spark-insertinto-function-1870175c3ee9

  • 请注意,这只发生在托管表上。外部表可以使用配置`spark.sql.sources.partitionOverwriteMode`设置为动态然后刷新表(`msck修复表..`) (6认同)

小智 5

我最近开始将 Hive 脚本转换为 Spark,并且仍在学习中。

我注意到 saveAsTable 和 insertInto 的一个重要行为尚未讨论。

df.write.mode("overwrite").saveAsTable("schema.table") 删除现有表“schema.table”并根据“df”架构重新创建一个新表。现有表的架构变得无关紧要,并且不必与 df 匹配。我被这种行为所困扰,因为我现有的表是 ORC,而创建的新表是 parquet(Spark 默认)。

df.write.mode("overwrite").insertInto("schema.table") 不会删除现有表,并期望现有表的架构与 'df' 的架构匹配。

我使用这两个选项检查了表的创建时间并重申了该行为。

原始表存储为 ORC - 2019 年 9 月 4 日星期三 21:27:33 GMT

saveAsTable 之后(存储更改为 Parquet) - 2019 年 9 月 4 日星期三 21:56:23 GMT(创建时间已更改)

删除并重新创建原始表 (ORC) - 2019 年 9 月 4 日星期三 21:57:38 GMT

insertInto 之后(仍然是 ORC) - 2019 年 9 月 4 日星期三 21:57:38 GMT(创建时间未更改)