太多的映射键导致spark中的内存不足异常

CRM*_*CRM 7 scala apache-spark

我有一个RDD 'inRDD'形式RDD[(Vector[(Int, Byte)], Vector[(Int, Byte)])]这是一个PairRDD(key,value)其中关键是Vector[(Int, Byte)]和值Vector[(Int, Byte)].

对于(Int, Byte)关键字段向量中的每个元素(Int, Byte),以及值字段向量中的每个元素,我希望在输出RDD中得到一个新的(键,值)对(Int, Int), (Byte, Byte).

这应该给我一个表格的RDD RDD[((Int, Int), (Byte, Byte))].

例如,inRDD内容可能是,像,

(Vector((3,2)),Vector((4,2))), (Vector((2,3), (3,3)),Vector((3,1))), (Vector((1,3)),Vector((2,1))), (Vector((1,2)),Vector((2,2), (1,2)))
Run Code Online (Sandbox Code Playgroud)

哪个会变成

((3,4),(2,2)), ((2,3),(3,1)), ((3,3),(3,1)), ((1,2),(3,1)), ((1,2),(2,2)), ((1,1),(2,2))
Run Code Online (Sandbox Code Playgroud)

我有以下代码.

val outRDD = inRDD.flatMap {                                        
    case (left, right) =>
    for ((ll, li) <- left; (rl, ri) <- right) yield {
        (ll,rl) -> (li,ri)
    }
}
Run Code Online (Sandbox Code Playgroud)

它适用于矢量尺寸较小的矢量inRDD.但是当向量中有很多元素时,我得到了out of memory exception.将可用内存增加到spark只能解决较小的输入,并且对于更大的输入会再次出现错误.看起来我正试图在内存中组装一个巨大的结构.我无法以任何其他方式重写此代码.

我已经实现了类似的逻辑,java in hadoop如下所示.

for (String fromValue : fromAssetVals) {
    fromEntity = fromValue.split(":")[0];
    fromAttr = fromValue.split(":")[1];
    for (String toValue : toAssetVals) {
        toEntity = toValue.split(":")[0];
        toAttr = toValue.split(":")[1];
        oKey = new Text(fromEntity.trim() + ":" + toEntity.trim());
        oValue = new Text(fromAttr + ":" + toAttr);
        outputCollector.collect(oKey, oValue);
    }
}
Run Code Online (Sandbox Code Playgroud)

但是当我在spark中尝试类似的东西时,我会得到嵌套的rdd异常.

我如何有效地做到这一点spark using scala

zer*_*323 3

好吧,如果笛卡尔积是唯一的选择,你至少可以让它变得更懒惰一点:

inRDD.flatMap { case (xs, ys) =>
  xs.toIterator.flatMap(x => ys.toIterator.map(y => (x, y)))
}
Run Code Online (Sandbox Code Playgroud)

您还可以在 Spark 级别处理此问题

import org.apache.spark.RangePartitioner

val indexed = inRDD.zipWithUniqueId.map(_.swap)
val partitioner = new RangePartitioner(indexed.partitions.size, indexed)
val partitioned = indexed.partitionBy(partitioner)

val lefts = partitioned.flatMapValues(_._1)
val rights = partitioned.flatMapValues(_._2)

lefts.join(rights).values
Run Code Online (Sandbox Code Playgroud)