如何更改通过 Spark 写入的文件的 ZSTD 压缩级别?

bel*_*lce 10 compression apache-spark parquet zstd

Spark文档中指出默认的zstd压缩级别为1。https://spark.apache.org/docs/latest/configuration.html

我在spark-defaults.conf中将此属性设置为不同的值,

和代码里面像

val conf = new SparkConf(false)
conf.set("spark.io.compression.zstd.level", "22")
val spark = SparkSession.builder.config(conf).getOrCreate()
..
Run Code Online (Sandbox Code Playgroud)

多次读取相同的输入并使用 zstd 压缩以 parquet 格式保存/写入它根本不会改变输出文件的大小。如何在 Spark 中调整这一压缩级别?

ei-*_*rad 3

该参数spark.io.compression.zstd.level是关于用于压缩中间文件的编解码器 - 序列化 RDD、shuffle、广播、检查点。在大多数情况下,唯一重要的是压缩速度,因此默认值1将是最佳选择(也应该设置spark.io.compression.codeczstd,以使该参数生效)。

遗憾的是,无法为spark.sql.parquet.compression.codecSpark 中指定的 Parquet 编解码器指定压缩级别。

从Spark 3.2(带有parquet-mr>=1.12.0)开始,有parquet.compression.codec.zstd.level选项,但似乎不起作用:

In [5]: for i in [1, 5, 10]: df.write.option('parquet.compression.codec.zstd.level', i
   ...: ).parquet(f"test-{i}.parquet", compression='zstd', mode='overwrite')
                                                                                
In [6]: !du -sh test-*.parquet
40M test-10.parquet
40M test-1.parquet
40M test-5.parquet
Run Code Online (Sandbox Code Playgroud)

作为一种解决方法,可以使用项目中的 Parquet 实现arrow(直接使用 C++ 或通过 pyarrow / go / 等;它允许指定compression_level每列的编解码器以及默认compression_level值)在将数据写入之前重新打包数据仓库。

遗憾的是,arrow-rsParquet 实现也不允许指定compression_level。但幸运的是,parquet2它被用于arrow2(箭头的无变形 Rust 实现) -确实如此