我正在寻找一种方法将RDD分成两个或更多RDD.我见过的最接近的是Scala Spark:拆分收集到几个RDD?这仍然是一个RDD.
如果您熟悉SAS,请执行以下操作:
data work.split1, work.split2;
set work.preSplit;
if (condition1)
output work.split1
else if (condition2)
output work.split2
run;
Run Code Online (Sandbox Code Playgroud)
这导致了两个不同的数据集.它必须立即坚持以获得我打算的结果......
这是我的一个火花工作的一个小问题,似乎没有引起任何问题 - 但每次我看到它并且未能提出更好的解决方案时,我都会烦恼.
假设我有一个像这样的Scala集合:
val myStuff = List(Try(2/2), Try(2/0))
Run Code Online (Sandbox Code Playgroud)
我可以使用分区将此列表分为成功和失败:
val (successes, failures) = myStuff.partition(_.isSuccess)
Run Code Online (Sandbox Code Playgroud)
这很好.分区的实现仅遍历源集合一次以构建两个新集合.但是,使用Spark,我能够设计的最佳等价物是这样的:
val myStuff: RDD[Try[???]] = sourceRDD.map(someOperationThatMayFail)
val successes: RDD[???] = myStuff.collect { case Success(v) => v }
val failures: RDD[Throwable] = myStuff.collect { case Failure(ex) => ex }
Run Code Online (Sandbox Code Playgroud)
除了解压缩Try的区别(这很好)之外,还需要遍历数据两次.这很烦人.
有没有更好的Spark替代方案可以在没有多次遍历的情况下拆分RDD?即具有这样的签名,其中分区具有Scala集合分区而不是RDD分区的行为:
val (successes: RDD[Try[???]], failures: RDD[Try[???]]) = myStuff.partition(_.isSuccess)
Run Code Online (Sandbox Code Playgroud)
作为参考,我以前使用类似下面的东西来解决这个问题.可能失败的操作是从二进制格式反序列化一些数据,并且失败已经变得足够有趣,它们需要被处理并保存为RDD而不是记录的东西.
def someOperationThatMayFail(data: Array[Byte]): Option[MyDataType] = {
try {
Some(deserialize(data))
} catch {
case e: MyDesrializationError => {
logger.error(e)
None
}
}
}
Run Code Online (Sandbox Code Playgroud)