外部覆盖后,Spark和Hive表架构不同步

hul*_*003 9 hive mapr apache-spark pyspark

我遇到的问题是Hive表的架构在使用Spark 2.1.0和Hive 2.1.1的Mapr集群上的Spark和Hive之间不同步.

我需要尝试专门为托管表解决此问题,但可以使用非托管/外部表重现该问题.

步骤概述

  1. saveAsTable一个数据帧保存到一个给定的表.
  2. 使用mode("overwrite").parquet("path/to/table")覆盖数据之前保存表.我实际上是通过Spark和Hive外部的进程修改数据,但这会重现同样的问题.
  3. 使用spark.catalog.refreshTable(...)刷新元
  4. 用表查询表spark.table(...).show().原始数据框和覆盖的数据框之间的任何列都将正确显示新数据,但不会显示仅在新表中的任何列.

db_name = "test_39d3ec9"
table_name = "overwrite_existing"
table_location = "<spark.sql.warehouse.dir>/{}.db/{}".format(db_name, table_name)

qualified_table = "{}.{}".format(db_name, table_name)
spark.sql("CREATE DATABASE IF NOT EXISTS {}".format(db_name))
Run Code Online (Sandbox Code Playgroud)

另存为托管表

existing_df = spark.createDataFrame([(1, 2)])
existing_df.write.mode("overwrite").saveAsTable(table_name)
Run Code Online (Sandbox Code Playgroud)

请注意,使用以下内容保存为非托管表将产生相同的问题:

existing_df.write.mode("overwrite") \
    .option("path", table_location) \
    .saveAsTable(qualified_table)
Run Code Online (Sandbox Code Playgroud)

查看表的内容

spark.table(table_name).show()
+---+---+
| _1| _2|
+---+---+
|  1|  2|
+---+---+
Run Code Online (Sandbox Code Playgroud)

直接覆盖镶木地板文件

new_df = spark.createDataFrame([(3, 4, 5, 6)], ["_4", "_3", "_2", "_1"])
new_df.write.mode("overwrite").parquet(table_location)
Run Code Online (Sandbox Code Playgroud)

使用镶木地板阅读器查看内容,内容显示正确

spark.read.parquet(table_location).show()
+---+---+---+---+
| _4| _3| _2| _1|
+---+---+---+---+
|  3|  4|  5|  6|
+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)

刷新表的spark元数据并再次作为表读入.将针对相同的列更新数据,但不显示其他列.

spark.catalog.refreshTable(qualified_table)
spark.table(qualified_table).show()
+---+---+
| _1| _2|
+---+---+
|  6|  5|
+---+---+
Run Code Online (Sandbox Code Playgroud)

我还尝试spark.catalog.refreshTable在hive shell中使用下面的命令调用之前更新hive中的模式:

ALTER TABLE test_39d3ec9.overwrite_existing REPLACE COLUMNS (`_1` bigint, `_2` bigint, `_3` bigint, `_4` bigint);
Run Code Online (Sandbox Code Playgroud)

运行ALTER命令后,我运行describe,它在hive中正确显示

DESCRIBE test_39d3ec9.overwrite_existing
OK
_1                      bigint
_2                      bigint
_3                      bigint
_4                      bigint
Run Code Online (Sandbox Code Playgroud)

在运行alter命令之前,它仅按预期显示原始列

DESCRIBE test_39d3ec9.overwrite_existing
OK
_1                      bigint
_2                      bigint
Run Code Online (Sandbox Code Playgroud)

然后我跑了,spark.catalog.refreshTable但它没有影响火花的数据视图.

补充说明

从火花方面来看,我使用PySpark完成了大部分测试,但也在spark-shell(scala)和sparksql shell中进行了测试.虽然在火花壳我也试过使用HiveContext但没有工作.

import org.apache.spark.sql.hive.HiveContext
import spark.sqlContext.implicits._
val hiveObj = new HiveContext(sc)
hiveObj.refreshTable("test_39d3ec9.overwrite_existing")
Run Code Online (Sandbox Code Playgroud)

在hive shell中执行ALTER命令后,我在Hue中验证了架构也在那里发生了变化.

我也试过运行ALTER命令,spark.sql("ALTER ...")但我们所使用的Spark版本(2.1.0)不允许它,看起来它基于此问题直到Spark 2.2.0才可用:https:// issues .apache.org/JIRA /浏览/ SPARK-19261

我也通过火花文档阅读一遍,特别是这部分:https://spark.apache.org/docs/2.1.0/sql-programming-guide.html#hive-metastore-parquet-table-conversion

基于这些文档,spark.catalog.refreshTable应该工作.配置spark.sql.hive.convertMetastoreParquet通常是false,但我把它切换到true测试,它似乎没有任何影响.

任何帮助将不胜感激,谢谢!

小智 2

我在 CDH 5.11.x 包中使用 Spark 2.2.0 时遇到了类似的问题。

spark.write.mode("overwrite").saveAsTable()当我发出后spark.read.table().show,将不会显示任何数据。

经过检查,我发现这是 CDH Spark 2.2.0 版本的一个已知问题。解决方法是在执行 saveAsTable 命令后运行以下命令。

spark.sql("ALTER TABLE qualified_table set SERDEPROPERTIES ('path'='hdfs://{hdfs_host_name}/{table_path}')")

spark.catalog.refreshTable("qualified_table")
Run Code Online (Sandbox Code Playgroud)

例如:如果您的表位置类似于hdfs://hdfsHA/user/warehouse/example.db/qualified_table
则分配'path'='hdfs://hdfsHA/user/warehouse/example.db/qualified_table'

这对我有用。试一试。我想现在你的问题已经解决了。如果没有你可以尝试这个方法。

解决方法来源: https://www.cloudera.com/documentation/spark2/2-2-x/topics/spark2_known_issues.html