yat*_*tin 52 apache-spark apache-spark-sql spark-dataframe
我想覆盖特定的分区而不是所有的火花.我正在尝试以下命令:
df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4')
Run Code Online (Sandbox Code Playgroud)
其中df是具有要覆盖的增量数据的数据帧.
hdfs-base-path包含主数据.
当我尝试上面的命令时,它会删除所有分区,并在hdfs路径中插入df中存在的分区.
我的要求是只覆盖指定hdfs路径中df中存在的那些分区.有人可以帮我吗?
Mad*_*llo 84
最后!这是Spark 2.3.0中的一个功能:https: //issues.apache.org/jira/browse/SPARK-20236
要使用它,您需要将spark.sql.sources.partitionOverwriteMode设置设置为dynamic,需要对数据集进行分区,并覆盖写入模式.例:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")
Run Code Online (Sandbox Code Playgroud)
我建议您在写入之前根据分区列进行重新分区,这样每个文件夹最终不会有400个文件.
在Spark 2.3.0之前,最好的解决方案是启动SQL语句来删除这些分区,然后使用mode append编写它们.
Sim*_*Sim 45
这是一个常见问题.Spark高达2.0的唯一解决方案是直接写入分区目录,例如,
df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value")
Run Code Online (Sandbox Code Playgroud)
如果您在2.0之前使用Spark,则需要使用以下命令阻止Spark发出元数据文件(因为它们会破坏自动分区发现):
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
Run Code Online (Sandbox Code Playgroud)
如果您在1.6.2之前使用Spark,则还需要删除该_SUCCESS文件/root/path/to/data/partition_col=value或其存在将破坏自动分区发现.(我强烈建议使用1.6.2或更高版本.)
您可以从我的Spark Summit关于Bulletproof Jobs的讨论中获得有关如何管理大型分区表的更多详细信息.
Zac*_*ach 20
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.toDF().write.mode("overwrite").format("parquet").partitionBy("date", "name").save("s3://path/to/somewhere")
Run Code Online (Sandbox Code Playgroud)
这对我适用于 AWS Glue ETL 作业(Glue 1.0 - Spark 2.4 - Python 2)
Sur*_*ali 10
在 insertInto 语句中添加 'overwrite=True' 参数可以解决这个问题:
hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
df.write.mode("overwrite").insertInto("database_name.partioned_table", overwrite=True)
Run Code Online (Sandbox Code Playgroud)
默认情况下overwrite=False。将其更改为True允许我们覆盖dfpartioned_table 中包含的特定分区。这有助于我们避免用 覆盖partioned_table 的全部内容df。
使用Spark 1.6 ......
HiveContext可以大大简化此过程.关键是您必须首先使用CREATE EXTERNAL TABLE定义了分区的语句在Hive中创建表.例如:
# Hive SQL
CREATE EXTERNAL TABLE test
(name STRING)
PARTITIONED BY
(age INT)
STORED AS PARQUET
LOCATION 'hdfs:///tmp/tables/test'
Run Code Online (Sandbox Code Playgroud)
从这里开始,假设您有一个Dataframe,其中包含特定分区(或多个分区)的新记录.您可以使用HiveContext SQL语句来执行INSERT OVERWRITE使用此Dataframe,这将仅覆盖Dataframe中包含的分区的表:
# PySpark
hiveContext = HiveContext(sc)
update_dataframe.registerTempTable('update_dataframe')
hiveContext.sql("""INSERT OVERWRITE TABLE test PARTITION (age)
SELECT name, age
FROM update_dataframe""")
Run Code Online (Sandbox Code Playgroud)
注意:update_dataframe在此示例中,模式具有与目标test表的模式匹配的模式.
使用这种方法一个简单的错误就是跳过CREATE EXTERNAL TABLEHive中的步骤,然后使用Dataframe API的写入方法创建表.特别是对于基于Parquet的表,将不会适当地定义表以支持Hive的INSERT OVERWRITE... PARTITION功能.
希望这可以帮助.
使用 Scala 在 Spark 2.3.1 上对此进行了测试。上面的大多数答案都是写入 Hive 表。但是,我想直接写入disk,它external hive table在此文件夹的顶部有一个。
首先需要的配置
val sparkSession: SparkSession = SparkSession
.builder
.enableHiveSupport()
.config("spark.sql.sources.partitionOverwriteMode", "dynamic") // Required for overwriting ONLY the required partitioned folders, and not the entire root folder
.appName("spark_write_to_dynamic_partition_folders")
Run Code Online (Sandbox Code Playgroud)
这里的用法:
DataFrame
.write
.format("<required file format>")
.partitionBy("<partitioned column name>")
.mode(SaveMode.Overwrite) // This is required.
.save(s"<path_to_root_folder>")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
48027 次 |
| 最近记录: |