我已经开始在Spark 1.4.0中使用Spark SQL和DataFrames.我想在Scala中定义DataFrame上的自定义分区程序,但是没有看到如何执行此操作.
我正在使用的一个数据表包含一个事务列表,按帐户,silimar到下面的示例.
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
Run Code Online (Sandbox Code Playgroud)
至少在最初,大多数计算将发生在帐户内的交易之间.所以我希望对数据进行分区,以便帐户的所有事务都在同一个Spark分区中.
但我没有看到定义这个的方法.DataFrame类有一个名为"repartition(Int)"的方法,您可以在其中指定要创建的分区数.但我没有看到任何方法可用于为DataFrame定义自定义分区程序,例如可以为RDD指定.
源数据存储在Parquet中.我确实看到在向Parquet编写DataFrame时,您可以指定要分区的列,因此我可以告诉Parquet通过"帐户"列对其数据进行分区.但是可能有数百万个帐户,如果我正确理解Parquet,它会为每个帐户创建一个独特的目录,因此这听起来不是一个合理的解决方案.
有没有办法让Spark分区这个DataFrame,以便一个帐户的所有数据都在同一个分区?
我正在尝试为最终用户创建一个聚合文件,以避免让他们使用更大的文件处理多个源.要做到这一点,我:A)遍历所有源文件夹,删除最常请求的12个字段,在这些结果共处的新位置旋转镶木地板文件.B)我尝试返回在步骤A中创建的文件,并通过按12个字段分组重新聚合它们,以将其减少为每个唯一组合的摘要行.
我发现的是,步骤A减少了有效载荷5:1(大约250演出成为48.5演出).然而,步骤B,而不是进一步减少这一点,比步骤A增加50%.但是,我的计数匹配.
这是使用Spark 1.5.2
我的代码,仅修改为使用field1 ... field12替换字段名称以使其更具可读性,下面是我已经注意到的结果.
虽然我不一定期望再减少5:1,但我不知道我做错了什么来增加存储端以减少具有相同模式的行.有谁能帮我理解我做错了什么?
谢谢!
//for each eventName found in separate source folders, do the following:
//spit out one row with key fields from the original dataset for quicker availability to clients
//results in a 5:1 reduction in size
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, cast(1 as bigint) as rCount from table"
sqlContext.sql(sqlCommand).coalesce(20).write.parquet("<aws folder>" + dt + "/" + eventName + "/")
//results in over 700 …Run Code Online (Sandbox Code Playgroud)