Car*_*cas 35 apache-spark rdd pyspark
我正在寻找一种方法将RDD分成两个或更多RDD.我见过的最接近的是Scala Spark:拆分收集到几个RDD?这仍然是一个RDD.
如果您熟悉SAS,请执行以下操作:
data work.split1, work.split2;
set work.preSplit;
if (condition1)
output work.split1
else if (condition2)
output work.split2
run;
Run Code Online (Sandbox Code Playgroud)
这导致了两个不同的数据集.它必须立即坚持以获得我打算的结果......
zer*_*323 58
从单个转换*中生成多个RDD是不可能的.如果要拆分RDD,则必须filter为每个拆分条件应用a .例如:
def even(x): return x % 2 == 0
def odd(x): return not even(x)
rdd = sc.parallelize(range(20))
rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
Run Code Online (Sandbox Code Playgroud)
如果你只有一个二元条件并且计算成本很高,你可能更喜欢这样的东西:
kv_rdd = rdd.map(lambda x: (x, odd(x)))
kv_rdd.cache()
rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()
rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()
Run Code Online (Sandbox Code Playgroud)
它意味着只有一个谓词计算,但需要额外传递所有数据.
重要的是要注意,只要输入RDD被正确缓存并且没有关于数据分布的额外假设,当涉及重复过滤器和具有嵌套if-else的for循环之间的时间复杂度时,没有显着差异.
对于N个元素和M条件,您必须执行的操作数明显与N次M成比例.在for循环的情况下,它应该更接近(N + MN)/ 2并且重复过滤器恰好是NM但是在结束时那天它只不过是O(NM).你可以和Jason Lenderman一起看看我的讨论**,了解一些利弊.
在很高的层次上你应该考虑两件事:
Spark转换是懒惰的,直到你执行一个动作你的RDD没有实现
为什么这有关系?回到我的例子:
rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
Run Code Online (Sandbox Code Playgroud)
如果以后我决定我只需要rdd_odd那么没有理由实现rdd_even.
如果您看一下您的SAS示例来计算,work.split2您需要实现输入数据和work.split1.
RDD提供声明性API.当您使用filter或map完全由Spark引擎完成此操作时.只要传递给转换的函数是免费的副作用,它就会创建多种优化整个管道的可能性.
在一天结束时,这个案例并不足以证明自己的转型是正确的.
带有过滤器模式的映射实际上用在核心Spark中.请参阅我对Sparks RDD.randomSplit如何实际拆分RDD以及该方法的相关部分的回答randomSplit.
如果唯一的目标是在输入上实现拆分,则可以使用partitionBy以下DataFrameWriter文本输出格式的子句:
def makePairs(row: T): (String, String) = ???
data
.map(makePairs).toDF("key", "value")
.write.partitionBy($"key").format("text").save(...)
Run Code Online (Sandbox Code Playgroud)
*Spark中只有3种基本类型的转换:
其中T,U,W可以是原子类型或产品 /元组(K,V).必须使用上述的某些组合来表达任何其他操作.您可以查看原始RDD文件了解更多详情.
** http://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman
***另请参阅Scala Spark:拆分收集到几个RDD?
与上面提到的其他海报一样,没有单一的原生RDD变换可以分割RDD,但是这里有一些"多路复用"操作可以有效地模拟RDD上的各种"分裂",而无需多次读取:
http://silex.freevariable.com/latest/api/#com.redhat.et.silex.rdd.multiplex.MuxRDDFunctions
一些特定于随机拆分的方法:
http://silex.freevariable.com/latest/api/#com.redhat.et.silex.sample.split.SplitSampleRDDFunctions
方法可从开源silex项目获得:
https://github.com/willb/silex
一篇博文解释了它们的工作原理:
http://erikerlandson.github.io/blog/2016/02/08/efficient-multiplexing-for-spark-rdds/
def muxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[U],
persist: StorageLevel): Seq[RDD[U]] = {
val mux = self.mapPartitionsWithIndex { case (id, itr) =>
Iterator.single(f(id, itr))
}.persist(persist)
Vector.tabulate(n) { j => mux.mapPartitions { itr => Iterator.single(itr.next()(j)) } }
}
def flatMuxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[TraversableOnce[U]],
persist: StorageLevel): Seq[RDD[U]] = {
val mux = self.mapPartitionsWithIndex { case (id, itr) =>
Iterator.single(f(id, itr))
}.persist(persist)
Vector.tabulate(n) { j => mux.mapPartitions { itr => itr.next()(j).toIterator } }
}
Run Code Online (Sandbox Code Playgroud)
正如其他地方所提到的,这些方法确实涉及内存的速度折衷,因为它们通过"急切地"而不是"懒惰地"计算整个分区结果来进行操作.因此,这些方法可能会在大型分区上遇到内存问题,而更大的传统惰性转换则不会.
| 归档时间: |
|
| 查看次数: |
43807 次 |
| 最近记录: |