如何将RDD拆分为两个或更多RDD?

Car*_*cas 35 apache-spark rdd pyspark

我正在寻找一种方法将RDD分成两个或更多RDD.我见过的最接近的是Scala Spark:拆分收集到几个RDD?这仍然是一个RDD.

如果您熟悉SAS,请执行以下操作:

data work.split1, work.split2;
    set work.preSplit;

    if (condition1)
        output work.split1
    else if (condition2)
        output work.split2
run;
Run Code Online (Sandbox Code Playgroud)

这导致了两个不同的数据集.它必须立即坚持以获得我打算的结果......

zer*_*323 58

从单个转换*中生成多个RDD是不可能的.如果要拆分RDD,则必须filter为每个拆分条件应用a .例如:

def even(x): return x % 2 == 0
def odd(x): return not even(x)
rdd = sc.parallelize(range(20))

rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
Run Code Online (Sandbox Code Playgroud)

如果你只有一个二元条件并且计算成本很高,你可能更喜欢这样的东西:

kv_rdd = rdd.map(lambda x: (x, odd(x)))
kv_rdd.cache()

rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()
rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()
Run Code Online (Sandbox Code Playgroud)

它意味着只有一个谓词计算,但需要额外传递所有数据.

重要的是要注意,只要输入RDD被正确缓存并且没有关于数据分布的额外假设,当涉及重复过滤器和具有嵌套if-else的for循环之间的时间复杂度时,没有显着差异.

对于N个元素和M条件,您必须执行的操作数明显与N次M成比例.在for循环的情况下,它应该更接近(N + MN)/ 2并且重复过滤器恰好是NM但是在结束时那天它只不过是O(NM).你可以和Jason Lenderman一起看看我的讨论**,了解一些利弊.

在很高的层次上你应该考虑两件事:

  1. Spark转换是懒惰的,直到你执行一个动作你的RDD没有实现

    为什么这有关系?回到我的例子:

    rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
    
    Run Code Online (Sandbox Code Playgroud)

    如果以后我决定我只需要rdd_odd那么没有理由实现rdd_even.

    如果您看一下您的SAS示例来计算,work.split2您需要实现输入数据和work.split1.

  2. RDD提供声明性API.当您使用filtermap完全由Spark引擎完成此操作时.只要传递给转换的函数是免费的副作用,它就会创建多种优化整个管道的可能性.

在一天结束时,这个案例并不足以证明自己的转型是正确的.

带有过滤器模式的映射实际上用在核心Spark中.请参阅我对Sparks RDD.randomSplit如何实际拆分RDD以及该方法的相关部分的回答randomSplit.

如果唯一的目标是在输入上实现拆分,则可以使用partitionBy以下DataFrameWriter文本输出格式的子句:

def makePairs(row: T): (String, String) = ???

data
  .map(makePairs).toDF("key", "value")
  .write.partitionBy($"key").format("text").save(...)
Run Code Online (Sandbox Code Playgroud)

*Spark中只有3种基本类型的转换:

  • RDD [T] => RDD [T]
  • RDD [T] => RDD [U]
  • (RDD [T],RDD [U])=> RDD [W]

其中T,U,W可以是原子类型或产品 /元组(K,V).必须使用上述的某些组合来表达任何其他操作.您可以查看原始RDD文件了解更多详情.

** http://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman

***另请参阅Scala Spark:拆分收集到几个RDD?


eje*_*eje 7

与上面提到的其他海报一样,没有单一的原生RDD变换可以分割RDD,但是这里有一些"多路复用"操作可以有效地模拟RDD上的各种"分裂",而无需多次读取:

http://silex.freevariable.com/latest/api/#com.redhat.et.silex.rdd.multiplex.MuxRDDFunctions

一些特定于随机拆分的方法:

http://silex.freevariable.com/latest/api/#com.redhat.et.silex.sample.split.SplitSampleRDDFunctions

方法可从开源silex项目获得:

https://github.com/willb/silex

一篇博文解释了它们的工作原理:

http://erikerlandson.github.io/blog/2016/02/08/efficient-multiplexing-for-spark-rdds/

def muxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[U],
  persist: StorageLevel): Seq[RDD[U]] = {
  val mux = self.mapPartitionsWithIndex { case (id, itr) =>
    Iterator.single(f(id, itr))
  }.persist(persist)
  Vector.tabulate(n) { j => mux.mapPartitions { itr => Iterator.single(itr.next()(j)) } }
}

def flatMuxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[TraversableOnce[U]],
  persist: StorageLevel): Seq[RDD[U]] = {
  val mux = self.mapPartitionsWithIndex { case (id, itr) =>
    Iterator.single(f(id, itr))
  }.persist(persist)
  Vector.tabulate(n) { j => mux.mapPartitions { itr => itr.next()(j).toIterator } }
}
Run Code Online (Sandbox Code Playgroud)

正如其他地方所提到的,这些方法确实涉及内存的速度折衷,因为它们通过"急切地"而不是"懒惰地"计算整个分区结果来进行操作.因此,这些方法可能会在大型分区上遇到内存问题,而更大的传统惰性转换则不会.

  • 在另一个答案中重新说明对话的一部分是值得的:多路复用允许通过单程计算提高效率,但是它通过将结果存储在"非惰性"容器中来实现,因此(取决于计算的内容)与传统的多通道变量相比,可以增加常驻内存,其中计算可能是懒惰的.换句话说,多路复用购买增加了计算效率,增加了内存使用 (2认同)
  • 这个评论作为答案的一部分不是更好吗? (2认同)