相关疑难解决方法(0)

在spark数据帧写入方法中覆盖特定分区

我想覆盖特定的分区而不是所有的火花.我正在尝试以下命令:

df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4')
Run Code Online (Sandbox Code Playgroud)

其中df是具有要覆盖的增量数据的数据帧.

hdfs-base-path包含主数据.

当我尝试上面的命令时,它会删除所有分区,并在hdfs路径中插入df中存在的分区.

我的要求是只覆盖指定hdfs路径中df中存在的那些分区.有人可以帮我吗?

apache-spark apache-spark-sql spark-dataframe

52
推荐指数
6
解决办法
5万
查看次数

如何通过spark插入HDFS?

我已经在HDFS中对数据进行了分区。在某个时候,我决定对其进行更新。该算法是:

  • 从kafka主题中读取新数据。
  • 找出新数据的分区名称。
  • 从具有HDFS中这些名称的分区中加载数据。
  • 将HDFS数据与新数据合并。
  • 覆盖磁盘上已经存在的分区。

问题是,如果新数据具有磁盘上尚不存在的分区,该怎么办。在这种情况下,它们不会被写入。/sf/answers/3478406991/ <-例如,此解决方案不编写新分区。 在此处输入图片说明

上图描述了这种情况。让我们将左磁盘视为已经存在于HDFS中的分区,并将右磁盘视为刚刚从Kafka收到的分区。

正确磁盘的某些分区将与现有分区相交,而其他分区则不会。这段代码:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
dataFrame
    .write
    .mode(SaveMode.Overwrite)
    .partitionBy("date", "key")
    .option("header", "true")
    .format(format)
    .save(path)
Run Code Online (Sandbox Code Playgroud)

无法将图片的蓝色部分写入磁盘。

那么,如何解决此问题?请提供代码。我正在寻找表演者。

那些不懂的人的例子:

假设我们在HDFS中有以下数据:

  • 分区A的数据为“ 1”
  • 分区B的数据为“ 1”

现在,我们收到以下新数据:

  • 分区B的数据为“ 2”
  • PartitionC的数据为“ 1”

因此,分区A和B在HDFS中,分区B和C是新分区,并且由于B在HDFS中,因此我们对其进行了更新。而且我想编写C。因此,最终结果应如下所示:

  • 分区A的数据为“ 1”
  • 分区B的数据为“ 2”
  • PartitionC的数据为“ 1”

但是,如果我使用上面的代码,则会得到以下信息:

  • 分区A的数据为“ 1”
  • 分区B的数据为“ 2”

因为overwrite dynamicspark 2.3 的新功能无法创建PartitionC。

更新:事实证明,如果您改为使用配置单元表,则可以使用。但是,如果您使用纯Spark,则不会...因此,我猜蜂巢的覆盖和Spark的覆盖工作有所不同。

bigdata hdfs apache-spark apache-spark-sql

6
推荐指数
1
解决办法
1139
查看次数

如何从按月分区的 parquet 文件中删除特定月份

我有monthly过去 5 年的收入数据,并且我parquetappend模式但列的格式存储各个月份的数据帧。这是下面的伪代码 -partitioned by month

def Revenue(filename):
    df = spark.read.load(filename)
    .
    .
    df.write.format('parquet').mode('append').partitionBy('month').save('/path/Revenue')

Revenue('Revenue_201501.csv')
Revenue('Revenue_201502.csv')
Revenue('Revenue_201503.csv')
Revenue('Revenue_201504.csv')
Revenue('Revenue_201505.csv')
Run Code Online (Sandbox Code Playgroud)

df每月以格式存储,parquet如下所示 -

在此输入图像描述

问:如何删除parquet特定月份对应的文件夹?

一种方法是将所有这些parquet文件加载​​到一个大文件中df,然后使用.where()子句过滤掉该特定月份,然后将其保存回模式月份parquet格式,如下所示 -partitionByoverwrite

# If we want to remove data from Feb, 2015
df = spark.read.format('parquet').load('Revenue.parquet')
df = df.where(col('month') != lit('2015-02-01'))
df.write.format('parquet').mode('overwrite').partitionBy('month').save('/path/Revenue')
Run Code Online (Sandbox Code Playgroud)

但是,这种方法相当麻烦。

另一种方法是直接删除该特定月份的文件夹,但我不确定这是否是处理问题的正确方法,以免我们metadata以不可预见的方式更改。

parquet删除特定月份的数据的正确方法是什么?

python apache-spark parquet pyspark

6
推荐指数
1
解决办法
2万
查看次数