我正在使用 Java-Spark (Spark 2.2.0)。
我正在尝试按如下方式删除 Hive 分区:
spark.sql(""ALTER TABLE backup DROP PARTITION (date < '20180910')"
Run Code Online (Sandbox Code Playgroud)
并得到以下异常:
org.apache.spark.sql.catalyst.parser.ParseException: 不匹配的输入 '<' 期望 {')', ','}(line 1, pos 42)
我知道这是一个悬而未决的问题ALTER TABLE DROP PARTITION 应该支持应该在我的版本中修复的比较器,但我仍然遇到异常。
从 Spark 中删除分区的替代方法是什么?还有另一种实现吗?
谢谢。
看来暂时没有办法做到这一点。如SPARK-14922所示,此修复的目标版本是 3.0.0 并且仍在进行中。
因此,我认为有两种可能的解决方法。
让我们使用 Spark 2.4.3 设置问题:
// We create the table
spark.sql("CREATE TABLE IF NOT EXISTS potato (size INT) PARTITIONED BY (hour STRING)")
// Enable dynamic partitioning
spark.conf.set("hive.exec.dynamic.partition.mode","nonstrict")
// Insert some dummy records
(1 to 9).map(i => spark.sql(s"INSERT INTO potato VALUES ($i, '2020-06-07T0$i')"))
// Verify inserts
spark.table("potato").count // 9 records
Run Code Online (Sandbox Code Playgroud)
现在...尝试从 Spark 内部删除单个分区是可行的!
spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour='2020-06-07T01')""")
spark.table("potato").count // 8 records
Run Code Online (Sandbox Code Playgroud)
尝试删除多个分区不起作用。
spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour="2020-06-07T02", hour="2020-06-07T03")""")
org.apache.spark.sql.catalyst.parser.ParseException:
Found duplicate keys 'hour'.(line 1, pos 34)
== SQL ==
ALTER TABLE potato DROP IF EXISTS PARTITION (hour="2020-06-07T02", hour="2020-06-07T03")
----------------------------------^^^
Run Code Online (Sandbox Code Playgroud)
使用比较运算符删除一系列分区也不起作用。
spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour<="2020-06-07T03")""")
org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '<=' expecting {')', ','}(line 1, pos 49)
== SQL ==
ALTER TABLE potato DROP IF EXISTS PARTITION (hour<="2020-06-07T03")
-------------------------------------------------^^^
Run Code Online (Sandbox Code Playgroud)
据推测,发生这种情况是因为分区列是一个字符串,并且我们正在使用比较运算符。
我找到的解决方案是:
[Map[String,String]( TablePartitionSpec) 序列传递给目录dropPartitions函数。步骤1:
// Get External Catalog
val catalog = spark.sharedState.externalCatalog
// Get the spec from the list of partitions
val partitions = catalog.listPartitions("default", "potato").map(_.spec)
// Filter according to the condition you attempted.
val filteredPartitions = partitions.flatten.filter(_._2 <= "2020-06-07T03")
.map(t => Map(t._1 -> t._2))
Run Code Online (Sandbox Code Playgroud)
第2步:
我们将每个参数元组传递给单独的 ALTER TABLE DROP PARTITION 语句。
filteredPartitions.flatten.foreach(t =>
spark.sql(s"""ALTER TABLE potato DROP IF EXISTS PARTITION (${t._1}="${t._2}")"""))
spark.table("potato").count // 6 records
Run Code Online (Sandbox Code Playgroud)
或者将它们传递给 Catalog 的dropPartition函数。
// If you purge data, it gets deleted immediately and isn't moved to trash.
// This takes precedence over retainData, so even if you retainData but purge,
// your data is gone.
catalog.dropPartitions("default", "potato", filteredPartitions,
ignoreIfNotExists=true, purge=true, retainData=false)
spark.table("potato").count // 6 records
Run Code Online (Sandbox Code Playgroud)
我希望这有帮助。如果您对 Spark 2.x 有更好的解决方案,请告诉我。
您可以使用 Spark 编程执行相同的操作。另外,它在 Spark 2 、 2.1 和 2.2 中也没有修复,以供参考https://issues.apache.org/jira/browse/SPARK-14922
Steps
1 . Create hive context
2 . Get the table for getTable method from the hive context and you need to pass dbName, tableName and a boolean value if any error
3 . From table Object hive.getPartitions(table) you can get the partitions from hive context (you need to decide which partitions you are going delete )
4 . You can remove partitions using dropPartition with partition values , table name and db info (hive.dropPartition)
hiveContext.getPartitions(table)
hiveContext.dropPartition(dbName, tableName, partition.getValues(), true)
You need to validate the partition name and check whether it needs to be deleted or not (you need to write custom method ).
Or you can get the partition list sql using show partitions and from there also you can use drop partition to remove it.
This may give you some pointers .
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
14805 次 |
| 最近记录: |