为什么Spark Parquet文件的聚合大于原始聚合?

Ste*_*rew 5 storage aggregation apache-spark parquet

我正在尝试为最终用户创建一个聚合文件,以避免让他们使用更大的文件处理多个源.要做到这一点,我:A)遍历所有源文件夹,删除最常请求的12个字段,在这些结果共处的新位置旋转镶木地板文件.B)我尝试返回在步骤A中创建的文件,并通过按12个字段分组重新聚合它们,以将其减少为每个唯一组合的摘要行.

我发现的是,步骤A减少了有效载荷5:1(大约250演出成为48.5演出).然而,步骤B,而不是进一步减少这一点,比步骤A增加50%.但是,我的计数匹配.

这是使用Spark 1.5.2
我的代码,仅修改为使用field1 ... field12替换字段名称以使其更具可读性,下面是我已经注意到的结果.

虽然我不一定期望再减少5:1,但我不知道我做错了什么来增加存储端以减少具有相同模式的行.有谁能帮我理解我做错了什么?

谢谢!

//for each eventName found in separate source folders, do the following:
//spit out one row with key fields from the original dataset for quicker availability to clients 
//results in a 5:1 reduction in size
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, cast(1 as bigint) as rCount from table"
sqlContext.sql(sqlCommand).coalesce(20).write.parquet("<aws folder>" + dt + "/" + eventName + "/")
//results in over 700 files with a total of  16,969,050,506 rows consuming 48.65 gigs of storage space in S3, compressed 

//after all events are processed, aggregate the results
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, sum(rCount) as rCount from results group by field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12"
//Use a wildcard to search all sub-folders created above
sqlContext.read.parquet("<aws folder>" + dt + "/*/").registerTempTable("results")
sqlContext.sql(sqlStatement).coalesce(20).saveAsParquetFile("<a new aws folder>" + dt + "/")
//This results in  3,295,206,761 rows with an aggregate value of 16,969,050,506 for rCount but consumes 79.32 gigs of storage space in S3, compressed

//The parquet schemas created (both tables match):
 |-- field1: string (nullable = true) (10 characters)
 |-- field2: string (nullable = true) (15 characters)
 |-- field3: string (nullable = true) (50 characters max)
 |-- field4: string (nullable = true) (10 characters)
 |-- field5: string (nullable = true) (10 characters)
 |-- field6: string (nullable = true) (10 characters)
 |-- field7: string (nullable = true) (16 characters)
 |-- field8: string (nullable = true) (10 characters)
 |-- field9 string (nullable = true)  (15 characters)
 |-- field10: string (nullable = true)(20 characters)
 |-- field11: string (nullable = true)(14 characters)
 |-- field12: string (nullable = true)(14 characters)
 |-- rCount: long (nullable = true)   
 |-- dt: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

zer*_*323 5

一般来说,像Parquet这样的柱状存储格式在数据分发(数据组织)和各列的基数方面非常敏感.数据越有条理,基数越低,存储效率越高.

聚合,就像你应用的那样,必须改组数据.当您检查执行计划时,您将看到它正在使用散列分区程序.这意味着聚合分布后的效率可能低于原始数据的分配效率.同时sum可以减少行数但增加rCount列的基数.

您可以尝试使用不同的工具来解决这个问题,但Spark 1.5.2中并不是所有工具都可用:

  • 按基数较低的列对完整数据集进行排序(由于完全洗牌而非常昂贵)或sortWithinPartitions.
  • 使用partitionBy的方法DataFrameWriter来划分使用低基数列的数据.
  • (Spark 2.0.0+)的使用bucketBysortBy方法,DataFrameWriter使用存储和本地排序来改善数据分发.

  • 但是为什么基于低基数的排序有助于压缩呢?如果我理解正确,排序有助于运行长度编码,而相对较低基数的列是使用字典编码压缩的 - 那么为什么排序很重要?除非...将这些值分组在一起有助于推断基数较低,并且如果它们是分布式的,则可能不会发生这种推断(这是一个问题,而不是陈述) (2认同)