在spark中保存固定大小的镶木地板输出文件

war*_*ner 4 apache-spark spark-dataframe

我有 160GB 的数据,在 DATE 列上分区并以在 spark 1.6.0 上运行的镶木地板文件格式存储。我需要在每个分区中存储具有相同大小文件的输出镶木地板文件,每个分区都有固定大小,比如每个 100MB。

我尝试使用以下代码:

  val blockSize= 1024*1024*100
  sc.hadoopConfiguration.setInt("dfs.blocksize", blockSize)
  sc.hadoopConfiguration.setInt("parquet.block.size",blockSize)
Run Code Online (Sandbox Code Playgroud)

df1.write.partitionBy("DATE").parquet("output_file_path")

上面的配置不起作用,它正在创建具有默认分区数的多个文件,而不是 100 MB 的文件。

Ice*_*Man 6

不可能为每个文件获得完全相同的大小,但您可以向 Spark 提供足够的提示,使它们“在”特定大小范围内。一般目标是使每个文件等于 HDFS 块大小,并且每个文件包含一个(或多个)行组。您希望行组适合一个 HDFS 块。如果一个行组不适合一个块,您会遇到需要进行额外网络调用以读取另一个 HDFS 块以完全读取行组的情况。

为此,请执行以下操作:

  • 将 spark.sql.files.maxPartitionBytes 在 spark conf 中设置为 256 MB(等于您的 HDFS 块大小)
  • 将 Spark 中的 parquet writer 选项上的 parquet.block.size 设置为 256 MB。

tradesDF.write.option("parquet.block.size", 256 * 1024 * 1024)


lev*_*lev 1

您可以尝试以下方法:

首先,您应该估计数据中单行的大小。
很难做到准确(因为 parquet 文件也包含元数据),但您可以获取 1000 行数据,写入文件,并估计单行的大小

由此计算 100MB 可以容纳多少行:

N = 100MB / size_of_row 
Run Code Online (Sandbox Code Playgroud)

现在您可以创建一个附加列,每行都有一个存储桶 ID:

val df2 = df.withColumn("bucket", (rank.over(Window.partitionBy("DATE")) / N).cast(IntegerType))
Run Code Online (Sandbox Code Playgroud)

现在您可以按日期和存储桶重新分区数据:

df2
  .repartition($"DATE", $"bucket")
  .dropColumn("bucket")
  .write
  .parquet(...)
Run Code Online (Sandbox Code Playgroud)