如何通过spark插入HDFS?

hey*_*you 6 bigdata hdfs apache-spark apache-spark-sql

我已经在HDFS中对数据进行了分区。在某个时候,我决定对其进行更新。该算法是:

  • 从kafka主题中读取新数据。
  • 找出新数据的分区名称。
  • 从具有HDFS中这些名称的分区中加载数据。
  • 将HDFS数据与新数据合并。
  • 覆盖磁盘上已经存在的分区。

问题是,如果新数据具有磁盘上尚不存在的分区,该怎么办。在这种情况下,它们不会被写入。/sf/answers/3478406991/ <-例如,此解决方案不编写新分区。 在此处输入图片说明

上图描述了这种情况。让我们将左磁盘视为已经存在于HDFS中的分区,并将右磁盘视为刚刚从Kafka收到的分区。

正确磁盘的某些分区将与现有分区相交,而其他分区则不会。这段代码:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
dataFrame
    .write
    .mode(SaveMode.Overwrite)
    .partitionBy("date", "key")
    .option("header", "true")
    .format(format)
    .save(path)
Run Code Online (Sandbox Code Playgroud)

无法将图片的蓝色部分写入磁盘。

那么,如何解决此问题?请提供代码。我正在寻找表演者。

那些不懂的人的例子:

假设我们在HDFS中有以下数据:

  • 分区A的数据为“ 1”
  • 分区B的数据为“ 1”

现在,我们收到以下新数据:

  • 分区B的数据为“ 2”
  • PartitionC的数据为“ 1”

因此,分区A和B在HDFS中,分区B和C是新分区,并且由于B在HDFS中,因此我们对其进行了更新。而且我想编写C。因此,最终结果应如下所示:

  • 分区A的数据为“ 1”
  • 分区B的数据为“ 2”
  • PartitionC的数据为“ 1”

但是,如果我使用上面的代码,则会得到以下信息:

  • 分区A的数据为“ 1”
  • 分区B的数据为“ 2”

因为overwrite dynamicspark 2.3 的新功能无法创建PartitionC。

更新:事实证明,如果您改为使用配置单元表,则可以使用。但是,如果您使用纯Spark,则不会...因此,我猜蜂巢的覆盖和Spark的覆盖工作有所不同。

hey*_*you 1

最后我决定从 HDFS 中删除分区的“绿色”子集,然后使用SaveMode.Append它。我认为这是火花中的一个错误。