我已经开始在Spark 1.4.0中使用Spark SQL和DataFrames.我想在Scala中定义DataFrame上的自定义分区程序,但是没有看到如何执行此操作.
我正在使用的一个数据表包含一个事务列表,按帐户,silimar到下面的示例.
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
Run Code Online (Sandbox Code Playgroud)
至少在最初,大多数计算将发生在帐户内的交易之间.所以我希望对数据进行分区,以便帐户的所有事务都在同一个Spark分区中.
但我没有看到定义这个的方法.DataFrame类有一个名为"repartition(Int)"的方法,您可以在其中指定要创建的分区数.但我没有看到任何方法可用于为DataFrame定义自定义分区程序,例如可以为RDD指定.
源数据存储在Parquet中.我确实看到在向Parquet编写DataFrame时,您可以指定要分区的列,因此我可以告诉Parquet通过"帐户"列对其数据进行分区.但是可能有数百万个帐户,如果我正确理解Parquet,它会为每个帐户创建一个独特的目录,因此这听起来不是一个合理的解决方案.
有没有办法让Spark分区这个DataFrame,以便一个帐户的所有数据都在同一个分区?
我读了一下文档HashPartitioner.不幸的是,除了API调用之外没有解释太多.我假设HashPartitioner根据键的哈希对分布式集进行分区.例如,如果我的数据是这样的
(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)
Run Code Online (Sandbox Code Playgroud)
因此,分区器会将其放入不同的分区,同一个键落在同一个分区中.但是我不明白构造函数参数的意义
new HashPartitoner(numPartitions) //What does numPartitions do?
Run Code Online (Sandbox Code Playgroud)
对于上述数据集,如果我这样做,结果会有何不同
new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)
Run Code Online (Sandbox Code Playgroud)
那么HashPartitioner工作怎么样呢?
鉴于HashPartitioner文档说:
[HashPartitioner]使用Java的Object.hashCode实现基于散列的分区.
说我想DeviceData通过它进行分区kind.
case class DeviceData(kind: String, time: Long, data: String)
Run Code Online (Sandbox Code Playgroud)
RDD[DeviceData]通过覆盖deviceData.hashCode()方法并仅使用哈希码来分区是否正确kind?
但是考虑到HashPartitioner需要一些分区参数,我很困惑我是否需要事先知道种类的数量以及如果有多种类型而不是分区会发生什么?
如果我将分区数据写入磁盘,它会在读取时保持分区吗?
我的目标是致电
deviceDataRdd.foreachPartition(d: Iterator[DeviceData] => ...)
Run Code Online (Sandbox Code Playgroud)
并且在迭代器中只有DeviceData相同的kind值.
我正在开发一个Scala(2.11)/ Spark(1.6.1)流式项目,mapWithState()用于跟踪以前批次中看到的数据.
状态分为20个分区,用StateSpec.function(trackStateFunc _).numPartitions(20).创建.我曾希望在整个集群中分发状态,但似乎每个节点都保持完整状态,并且执行总是只执行一个节点.
Locality Level Summary: Node local: 50在每个批次的UI中显示,完整批次是随机读取.之后,我写信给Kafka,分区再次传遍整个集群.我似乎无法找出为什么mapWithState()需要在单个节点上运行.如果它被一个节点而不是整个集群限制,这是否会破坏分区状态的概念?难道不能按密钥分配状态吗?
我有这个简单的火花程序。我想知道为什么所有数据最终都在一个分区中。
val l = List((30002,30000), (50006,50000), (80006,80000),
(4,0), (60012,60000), (70006,70000),
(40006,40000), (30012,30000), (30000,30000),
(60018,60000), (30020,30000), (20010,20000),
(20014,20000), (90008,90000), (14,0), (90012,90000),
(50010,50000), (100008,100000), (80012,80000),
(20000,20000), (30010,30000), (20012,20000),
(90016,90000), (18,0), (12,0), (70016,70000),
(20,0), (80020,80000), (100016,100000), (70014,70000),
(60002,60000), (40000,40000), (60006,60000),
(80000,80000), (50008,50000), (60008,60000),
(10002,10000), (30014,30000), (70002,70000),
(40010,40000), (100010,100000), (40002,40000),
(20004,20000),
(10018,10000), (50018,50000), (70004,70000),
(90004,90000), (100004,100000), (20016,20000))
val l_rdd = sc.parallelize(l, 2)
// print each item and index of the partition it belongs to
l_rdd.mapPartitionsWithIndex((index, iter) => {
iter.toList.map(x => (index, x)).iterator
}).collect.foreach(println)
// …Run Code Online (Sandbox Code Playgroud) 我有 RDD,其中每条记录都是 int:
[0,1,2,3,4,5,6,7,8]
Run Code Online (Sandbox Code Playgroud)
我需要做的就是将这个 RDD 分成几批。即制作另一个 RDD,其中每个元素都是固定大小的元素列表:
[[0,1,2], [3,4,5], [6,7,8]]
Run Code Online (Sandbox Code Playgroud)
这听起来微不足道,但是,我最近几天感到困惑,除了以下解决方案之外找不到任何东西:
使用 ZipWithIndex 枚举 RDD 中的记录:
[0,1,2,3,4,5] -> [(0, 0),(1, 1),(2, 2),(3, 3),(4, 4),(5, 5)]
使用 map() 迭代这个 RDD 并计算索引 index = int(index / batchSize)
[1,2,3,4,5,6] -> [(0, 0),(0, 1),(0, 2),(1, 3),(1, 4),(1, 5)]
然后按生成的索引分组。
[(0, [0,1,2]), (1, [3,4,5])]
这将为我提供我需要的东西,但是,我不想在这里使用 group。当您使用普通 Map Reduce 或某些抽象(如 Apache Crunch)时,这很简单。但是有没有办法在不使用重分组的情况下在 Spark 中产生类似的结果?