war*_*ner 4 apache-spark spark-dataframe
我有 160GB 的数据,在 DATE 列上分区并以在 spark 1.6.0 上运行的镶木地板文件格式存储。我需要在每个分区中存储具有相同大小文件的输出镶木地板文件,每个分区都有固定大小,比如每个 100MB。
我尝试使用以下代码:
val blockSize= 1024*1024*100
sc.hadoopConfiguration.setInt("dfs.blocksize", blockSize)
sc.hadoopConfiguration.setInt("parquet.block.size",blockSize)
Run Code Online (Sandbox Code Playgroud)
df1.write.partitionBy("DATE").parquet("output_file_path")
上面的配置不起作用,它正在创建具有默认分区数的多个文件,而不是 100 MB 的文件。
不可能为每个文件获得完全相同的大小,但您可以向 Spark 提供足够的提示,使它们“在”特定大小范围内。一般目标是使每个文件等于 HDFS 块大小,并且每个文件包含一个(或多个)行组。您希望行组适合一个 HDFS 块。如果一个行组不适合一个块,您会遇到需要进行额外网络调用以读取另一个 HDFS 块以完全读取行组的情况。
为此,请执行以下操作:
tradesDF.write.option("parquet.block.size", 256 * 1024 * 1024)
您可以尝试以下方法:
首先,您应该估计数据中单行的大小。
很难做到准确(因为 parquet 文件也包含元数据),但您可以获取 1000 行数据,写入文件,并估计单行的大小
由此计算 100MB 可以容纳多少行:
N = 100MB / size_of_row
Run Code Online (Sandbox Code Playgroud)
现在您可以创建一个附加列,每行都有一个存储桶 ID:
val df2 = df.withColumn("bucket", (rank.over(Window.partitionBy("DATE")) / N).cast(IntegerType))
Run Code Online (Sandbox Code Playgroud)
现在您可以按日期和存储桶重新分区数据:
df2
.repartition($"DATE", $"bucket")
.dropColumn("bucket")
.write
.parquet(...)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8682 次 |
| 最近记录: |