我在纱线集群中有大RDD(1gb).在使用此群集的本地计算机上,我只有512 MB.我想在本地机器上迭代RDD中的值.我不能使用collect(),因为它会在本地创建太大的数组,这比我的堆更多.我需要一些迭代的方式.有方法iterator(),但它需要一些额外的信息,我无法提供.
UDP:提交给LocalIterator方法
Wil*_*ire 43
更新: RDD.toLocalIterator在写完原始答案后出现的方法是一种更有效的方式来完成工作.它用于runJob仅评估每个步骤上的单个分区.
TL; DR和原始答案可能会大致了解它是如何工作的:
首先,获取分区索引数组:
val parts = rdd.partitions
然后创建较小的rdds过滤掉除了单个分区之外的所有内容.从较小的rdds收集数据并迭代单个分区的值:
for (p <- parts) {
    val idx = p.index
    val partRdd = rdd.mapPartitionsWithIndex(a => if (a._1 == idx) a._2 else Iterator(), true)
    //The second argument is true to avoid rdd reshuffling
    val data = partRdd.collect //data contains all values from a single partition 
                               //in the form of array
    //Now you can do with the data whatever you want: iterate, save to a file, etc.
}
我没有尝试这个代码,但它应该工作.如果不能编译,请写评论.当然,它只有在分区足够小时才会起作用.如果不是,您可以随时增加分区数rdd.coalesce(numParts, true).
sam*_*est 14
Wildfire的回答似乎在语义上是正确的,但我确信你应该能够通过使用Spark的API来提高效率.如果你想依次处理每个分区,我不明白为什么你不能使用map/ filter/ reduce/ reduceByKey/ mapPartitions操作.你想要在一个阵列中的一个地方拥有所有东西的唯一一次是你要进行非单一操作 - 但这似乎不是你想要的.你应该可以这样做:
rdd.mapPartitions(recordsIterator => your code that processes a single chunk)
或这个
rdd.foreachPartition(partition => {
  partition.toArray
  // Your code
})
这与@Wildlife 建议的方法相同,但是用pyspark编写.
关于这种方法的好处 - 它允许用户按顺序访问RDD中的记录.我正在使用此代码将RDD中的数据提供给机器学习工具进程的STDIN.
rdd = sc.parallelize(range(100), 10)
def make_part_filter(index):
    def part_filter(split_index, iterator):
        if split_index == index:
            for el in iterator:
                yield el
    return part_filter
for part_id in range(rdd.getNumPartitions()):
    part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
    data_from_part_rdd = part_rdd.collect()
    print "partition id: %s elements: %s" % (part_id, data_from_part_rdd)
产生输出:
partition id: 0 elements: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
partition id: 1 elements: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
partition id: 2 elements: [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
partition id: 3 elements: [30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
partition id: 4 elements: [40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
partition id: 5 elements: [50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
partition id: 6 elements: [60, 61, 62, 63, 64, 65, 66, 67, 68, 69]
partition id: 7 elements: [70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
partition id: 8 elements: [80, 81, 82, 83, 84, 85, 86, 87, 88, 89]
partition id: 9 elements: [90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
| 归档时间: | 
 | 
| 查看次数: | 40777 次 | 
| 最近记录: |