相关疑难解决方法(0)

如何在Spark中分区和写入DataFrame而不删除没有新数据的分区?

我试图DataFrame用Parquet格式保存到HDFS,使用DataFrameWriter三列值进行分区,如下所示:

dataFrame.write.mode(SaveMode.Overwrite).partitionBy("eventdate", "hour", "processtime").parquet(path)
Run Code Online (Sandbox Code Playgroud)

正如提到的这个问题,partitionBy将在删除分区的全部现有层次path,并在分区取而代之dataFrame.由于特定日期的新增量数据将定期出现,我想要的是仅替换层次结构中dataFrame具有数据的那些分区,而保持其他分区不变.

要做到这一点,似乎我需要使用其完整路径单独保存每个分区,如下所示:

singlePartition.write.mode(SaveMode.Overwrite).parquet(path + "/eventdate=2017-01-01/hour=0/processtime=1234567890")
Run Code Online (Sandbox Code Playgroud)

但是我无法理解将数据组织到单分区中DataFrame的最佳方法,以便我可以使用它们的完整路径将它们写出来.一个想法是这样的:

dataFrame.repartition("eventdate", "hour", "processtime").foreachPartition ...
Run Code Online (Sandbox Code Playgroud)

foreachPartition操作上Iterator[Row]是不理想的写出来镶木格式.

我还考虑使用a select...distinct eventdate, hour, processtime获取分区列表,然后按每个分区过滤原始数据帧并将结果保存到完整的分区路径.但是,每个分区的独特查询加过滤器似乎效率不高,因为它会进行大量的过滤/写入操作.

我希望有一种更简洁的方法来保留dataFrame没有数据的现有分区?

谢谢阅读.

Spark版本:2.1

partitioning apache-spark parquet spark-dataframe

25
推荐指数
3
解决办法
3万
查看次数

仅覆盖分区的spark数据集中的某些分区

我们如何覆盖分区数据集,但只覆盖我们要更改的分区?例如,重新计算上周的日常工作,并且只覆盖上周的数据.

默认的Spark行为是覆盖整个表,即使只写一些分区.

hive apache-spark apache-spark-dataset

15
推荐指数
2
解决办法
5097
查看次数

使用spark覆盖hive分区

我正在使用AWS,我有使用Spark和Hive的工作流程.我的数据按日期分区,所以每天我在S3存储中都有一个新分区.我的问题是有一天加载数据失败并且我必须重新执行该分区.写的代码是下一个:

df                            // My data in a Dataframe
  .write
  .format(getFormat(target))  // csv by default, but could be parquet, ORC...
  .mode(getSaveMode("overwrite"))  // Append by default, but in future it should be Overwrite
  .partitionBy(partitionName) // Column of the partition, the date
  .options(target.options)    // header, separator...
  .option("path", target.path) // the path where it will be storage
  .saveAsTable(target.tableName)  // the table name
Run Code Online (Sandbox Code Playgroud)

我的流程会发生什么?如果我使用SaveMode.Overwrite,整个表将被删除,我将只保存分区.如果我使用SaveMode.Append我可能有重复的数据.

进行搜索,我发现Hive支持这种覆盖,只有分区,但是使用hql语句,我没有它.

我们需要Hive上的解决方案,所以我们不能使用这个替代选项(直接到csv).

我发现这张Jira票据可以解决我遇到的问题,但尝试使用最新版本的Spark(2.3.0),情况是一样的.它删除整个表并保存分区,而不是覆盖我的数据所具有的分区.

为了使这更清楚,这是一个例子:

由A分区

数据:

| A | B | C | 
|---|---|---| 
| b | …
Run Code Online (Sandbox Code Playgroud)

hadoop hive scala amazon-web-services apache-spark

10
推荐指数
1
解决办法
4637
查看次数

如何通过spark插入HDFS?

我已经在HDFS中对数据进行了分区。在某个时候,我决定对其进行更新。该算法是:

  • 从kafka主题中读取新数据。
  • 找出新数据的分区名称。
  • 从具有HDFS中这些名称的分区中加载数据。
  • 将HDFS数据与新数据合并。
  • 覆盖磁盘上已经存在的分区。

问题是,如果新数据具有磁盘上尚不存在的分区,该怎么办。在这种情况下,它们不会被写入。/sf/answers/3478406991/ <-例如,此解决方案不编写新分区。 在此处输入图片说明

上图描述了这种情况。让我们将左磁盘视为已经存在于HDFS中的分区,并将右磁盘视为刚刚从Kafka收到的分区。

正确磁盘的某些分区将与现有分区相交,而其他分区则不会。这段代码:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
dataFrame
    .write
    .mode(SaveMode.Overwrite)
    .partitionBy("date", "key")
    .option("header", "true")
    .format(format)
    .save(path)
Run Code Online (Sandbox Code Playgroud)

无法将图片的蓝色部分写入磁盘。

那么,如何解决此问题?请提供代码。我正在寻找表演者。

那些不懂的人的例子:

假设我们在HDFS中有以下数据:

  • 分区A的数据为“ 1”
  • 分区B的数据为“ 1”

现在,我们收到以下新数据:

  • 分区B的数据为“ 2”
  • PartitionC的数据为“ 1”

因此,分区A和B在HDFS中,分区B和C是新分区,并且由于B在HDFS中,因此我们对其进行了更新。而且我想编写C。因此,最终结果应如下所示:

  • 分区A的数据为“ 1”
  • 分区B的数据为“ 2”
  • PartitionC的数据为“ 1”

但是,如果我使用上面的代码,则会得到以下信息:

  • 分区A的数据为“ 1”
  • 分区B的数据为“ 2”

因为overwrite dynamicspark 2.3 的新功能无法创建PartitionC。

更新:事实证明,如果您改为使用配置单元表,则可以使用。但是,如果您使用纯Spark,则不会...因此,我猜蜂巢的覆盖和Spark的覆盖工作有所不同。

bigdata hdfs apache-spark apache-spark-sql

6
推荐指数
1
解决办法
1139
查看次数