Mik*_*kov 11 snappy apache-spark parquet apache-spark-sql spark-dataframe
社区!
请帮助我了解如何使用 Spark 获得更好的压缩率?
让我描述一下案例:
我有数据集,让我们把它的产品在其上的实木复合地板文件使用的编解码器使用Sqoop ImportTool进口HDFS瞬间。作为导入的结果,我有 100 个文件,总大小为46 GB,文件大小不同(最小 11MB,最大 1.5GB,平均 ~ 500MB)。记录总数超过80 亿条,有84 列
我也在使用snappy对 Spark 进行简单的读取/重新分区/写入,结果我得到:
~ 100 GB输出大小,具有相同的文件数、相同的编解码器、相同的数量和相同的列。
代码片段:
val productDF = spark.read.parquet("/ingest/product/20180202/22-43/")
productDF
.repartition(100)
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "snappy")
.parquet("/processed/product/20180215/04-37/read_repartition_write/general")
Run Code Online (Sandbox Code Playgroud)
摄取:
creator: parquet-mr version 1.5.0-cdh5.11.1 (build ${buildNumber})
extra: parquet.avro.schema = {"type":"record","name":"AutoGeneratedSchema","doc":"Sqoop import of QueryResult","fields"
and almost all columns looks like
AVAILABLE: OPTIONAL INT64 R:0 D:1
row group 1: RC:3640100 TS:36454739 OFFSET:4
AVAILABLE: INT64 SNAPPY DO:0 FPO:172743 SZ:370515/466690/1.26 VC:3640100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 126518400000, max: 1577692800000, num_nulls: 2541633]
Run Code Online (Sandbox Code Playgroud)
处理:
creator: parquet-mr version 1.5.0-cdh5.12.0 (build ${buildNumber})
extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields"
AVAILABLE: OPTIONAL INT64 R:0 D:1
...
row group 1: RC:6660100 TS:243047789 OFFSET:4
AVAILABLE: INT64 SNAPPY DO:0 FPO:4122795 SZ:4283114/4690840/1.10 VC:6660100 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE ST:[min: -2209136400000, max: 10413820800000, num_nulls: 4444993]
Run Code Online (Sandbox Code Playgroud)
另一方面,没有重新分区或使用合并 - 大小仍然接近摄取数据大小。
展望未来,我做了以下事情:
读取数据集并将其写回
productDF
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "none")
.parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithoutshuffle")
Run Code Online (Sandbox Code Playgroud)读取数据集,重新分区并将其写回
productDF
.repartition(500)
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "none")
.parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithshuffle")
Run Code Online (Sandbox Code Playgroud)结果:80 GB不带, 283 GB带重新分区,输出文件数量相同
80GB 镶木地板元示例:
AVAILABLE: INT64 UNCOMPRESSED DO:0 FPO:456753 SZ:1452623/1452623/1.00 VC:11000100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -1735747200000, max: 2524550400000, num_nulls: 7929352]
Run Code Online (Sandbox Code Playgroud)
283 GB 镶木地板元示例:
AVAILABLE: INT64 UNCOMPRESSED DO:0 FPO:2800387 SZ:2593838/2593838/1.00 VC:3510100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -2209136400000, max: 10413820800000, num_nulls: 2244255]
Run Code Online (Sandbox Code Playgroud)
看来,即使没有未压缩的数据,镶木地板本身(带编码?)也大大减少了数据的大小。如何 ?:)
我尝试读取未压缩的 80GB,重新分区并写回 - 我有 283 GB
我的第一个问题是为什么在火花重新分区/洗牌后我的尺寸越来越大?
第二个是如何有效地打乱 spark 中的数据以有利于 parquet 编码/压缩(如果有的话)?
一般来说,我不希望我的数据大小在火花处理后增长,即使我没有改变任何东西。
另外,我没有找到snappy是否有任何可配置的压缩率,例如 -1 ... -9?据我所知,gzip 有这个,但是在 Spark/Parquet writer 中控制这个速率的方法是什么?
感谢您的帮助!
谢谢!
小智 5
当您调用repartition(n)数据帧时,您正在进行循环分区。重新分区之前存在的任何数据局部性都消失了,熵增加了。因此,运行长度和字典编码器以及压缩编解码器实际上并没有太多可使用的。
所以当你重新分区时,你需要使用repartition (n, col)版本。给它一个可以保留数据局部性的好列。
此外,由于您可能正在优化下游作业的 sqooped 表,因此您可以sortWithinPartition加快扫描速度。
df.repartition(100, $"userId").sortWithinPartitions("userId").write.parquet(...)