写入和重新读取镶木地板文件时保留数据帧分区

wer*_*ner 5 apache-spark parquet

当我将具有定义分区的数据帧作为镶木地板文件写入磁盘,然后再次重新读取镶木地板文件时,分区丢失。有没有办法在写入和重新读取期间保留数据帧的原始分区?

示例代码

//create a dataframe with 100 partitions and print the number of partitions
val originalDf = spark.sparkContext.parallelize(1 to 10000).toDF().repartition(100)
println("partitions before writing to disk: " + originalDf.rdd.partitions.length)

//write the dataframe to a parquet file and count the number of files actually written to disk
originalDf.write.mode(SaveMode.Overwrite).parquet("tmp/testds")
println("files written to disk: " + new File("tmp/testds").list.size)

//re-read the parquet file into a dataframe and print the number of partitions 
val readDf = spark.read.parquet("tmp/testds")
println("partitions after reading from disk: " + readDf.rdd.partitions.length)
Run Code Online (Sandbox Code Playgroud)

打印出来

partitions before writing to disk: 100
files written to disk: 202
partitions after reading from disk: 4
Run Code Online (Sandbox Code Playgroud)

观察:

  • 第一个数字是预期结果,数据帧由 100 个分区组成
  • 第二个数字对我来说也不错:我得到了 100 个*.parquet文件、100 个*.parquet.crc 文件和两个_SUCCESS文件,所以 parquet 文件仍然由 100 个分区组成
  • 第三行显示再次读取parquet文件后,原来的分区丢失,读取parquet文件后的分区数量发生了变化。分区数和我的Spark集群的executor数有关
  • 无论我将 parquet 文件写入本地磁盘还是 Hdfs 存储,结果都是相同的
  • 当我运行一个动作上readDf,我可以在SparkUI看到,创建4个任务,当调用foreachPartitionreadDf的功能被执行四次

有没有办法repartition(100)在读取镶木地板文件后保留数据帧的原始分区而无需再次调用?

背景:在我的实际应用程序中,我用精心调整的分区编写了许多不同的数据集,我想恢复这些分区,而不必为每个数据帧单独记录分区在将它们写入磁盘时的样子。

我正在使用 Spark 2.3.0。


更新:Spark 2.4.6 和 3.0.0 的结果相同

Dan*_*bos 1

我们刚刚开源了这个问题的解决方案。您可以在https://github.com/lynxkite/lynxkite/blob/main/app/com/lynxanalytics/biggraph/partitioned_pa​​rquet /PartitionedParquet.scala 找到它。(它是大型 AGPL 应用程序的一部分,而不是库。对此感到抱歉。)

你像这样使用它:

//re-read the parquet file into a dataframe and print the number of partitions
val readDf = spark.read
  .format(com.lynxanalytics.biggraph.partitioned_parquet.PartitionedParquet.format)
  .option("partitions", 100)
  .load("tmp/testds")
println("partitions after reading from disk: " + readDf.rdd.partitions.length)
Run Code Online (Sandbox Code Playgroud)

输出:

partitions before writing to disk: 100
files written to disk: 202
partitions after reading from disk: 100
Run Code Online (Sandbox Code Playgroud)

您需要提供分区数量作为参数,因为无法通过查看文件来判断。我们看到文件从part-00000part-00099。也许原始表有 100 个分区。也许有 101 个,最后一个是空的。