在spark数据帧写入方法中覆盖特定分区

yat*_*tin 52 apache-spark apache-spark-sql spark-dataframe

我想覆盖特定的分区而不是所有的火花.我正在尝试以下命令:

df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4')
Run Code Online (Sandbox Code Playgroud)

其中df是具有要覆盖的增量数据的数据帧.

hdfs-base-path包含主数据.

当我尝试上面的命令时,它会删除所有分区,并在hdfs路径中插入df中存在的分区.

我的要求是只覆盖指定hdfs路径中df中存在的那些分区.有人可以帮我吗?

Mad*_*llo 84

最后!这是Spark 2.3.0中的一个功能:https: //issues.apache.org/jira/browse/SPARK-20236

要使用它,您需要将spark.sql.sources.partitionOverwriteMode设置设置为dynamic,需要对数据集进行分区,并覆盖写入模式.例:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")
Run Code Online (Sandbox Code Playgroud)

我建议您在写入之前根据分区列进行重新分区,这样每个文件夹最终不会有400个文件.

在Spark 2.3.0之前,最好的解决方案是启动SQL语句来删除这些分区,然后使用mode append编写它们.

  • 我很难找到使用它的设置,所以在这里留下参考:/sf/ask/3500456851/ (2认同)
  • @ y2k-shubham是的,使用`spark.sql('插入覆盖表TABLE_NAME分区(PARTITION_NAME = PARTITION_VALUE)您的选择声明)'至少适用于2.2,如果较早的版本支持,则不建议使用。 (2认同)

Sim*_*Sim 45

这是一个常见问题.Spark高达2.0的唯一解决方案是直接写入分区目录,例如,

df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value")
Run Code Online (Sandbox Code Playgroud)

如果您在2.0之前使用Spark,则需要使用以下命令阻止Spark发出元数据文件(因为它们会破坏自动分区发现):

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
Run Code Online (Sandbox Code Playgroud)

如果您在1.6.2之前使用Spark,则还需要删除该_SUCCESS文件/root/path/to/data/partition_col=value或其存在将破坏自动分区发现.(我强烈建议使用1.6.2或更高版本.)

您可以从我的Spark Summit关于Bulletproof Jobs的讨论中获得有关如何管理大型分区表的更多详细信息.

  • 关于那个的任何更新?saveToTable()是否会覆盖特定的分区?火花足够聪明,可以找出哪些分区被覆盖? (6认同)

Zac*_*ach 20

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.toDF().write.mode("overwrite").format("parquet").partitionBy("date", "name").save("s3://path/to/somewhere")
Run Code Online (Sandbox Code Playgroud)

这对我适用于 AWS Glue ETL 作业(Glue 1.0 - Spark 2.4 - Python 2)


Sur*_*ali 10

在 insertInto 语句中添加 'overwrite=True' 参数可以解决这个问题:

hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

df.write.mode("overwrite").insertInto("database_name.partioned_table", overwrite=True)
Run Code Online (Sandbox Code Playgroud)

默认情况下overwrite=False。将其更改为True允许我们覆盖dfpartioned_table 中包含的特定分区。这有助于我们避免用 覆盖partioned_table 的全部内容df


ver*_*idd 6

使用Spark 1.6 ......

HiveContext可以大大简化此过程.关键是您必须首先使用CREATE EXTERNAL TABLE定义了分区的语句在Hive中创建表.例如:

# Hive SQL
CREATE EXTERNAL TABLE test
(name STRING)
PARTITIONED BY
(age INT)
STORED AS PARQUET
LOCATION 'hdfs:///tmp/tables/test'
Run Code Online (Sandbox Code Playgroud)

从这里开始,假设您有一个Dataframe,其中包含特定分区(或多个分区)的新记录.您可以使用HiveContext SQL语句来执行INSERT OVERWRITE使用此Dataframe,这将仅覆盖Dataframe中包含的分区的表:

# PySpark
hiveContext = HiveContext(sc)
update_dataframe.registerTempTable('update_dataframe')

hiveContext.sql("""INSERT OVERWRITE TABLE test PARTITION (age)
                   SELECT name, age
                   FROM update_dataframe""")
Run Code Online (Sandbox Code Playgroud)

注意:update_dataframe在此示例中,模式具有与目标test表的模式匹配的模式.

使用这种方法一个简单的错误就是跳过CREATE EXTERNAL TABLEHive中的步骤,然后使用Dataframe API的写入方法创建表.特别是对于基于Parquet的表,将不会适当地定义表以支持Hive的INSERT OVERWRITE... PARTITION功能.

希望这可以帮助.


Ska*_*ndy 6

使用 Scala 在 Spark 2.3.1 上对此进行了测试。上面的大多数答案都是写入 Hive 表。但是,我想直接写入disk,它external hive table在此文件夹的顶部有一个。

首先需要的配置

val sparkSession: SparkSession = SparkSession
      .builder
      .enableHiveSupport()
      .config("spark.sql.sources.partitionOverwriteMode", "dynamic") // Required for overwriting ONLY the required partitioned folders, and not the entire root folder
      .appName("spark_write_to_dynamic_partition_folders")
Run Code Online (Sandbox Code Playgroud)

这里的用法:

DataFrame
.write
.format("<required file format>")
.partitionBy("<partitioned column name>")
.mode(SaveMode.Overwrite) // This is required.
.save(s"<path_to_root_folder>")
Run Code Online (Sandbox Code Playgroud)