use*_*932 1 scala apache-spark
我想使用flatMap来实现filter()+ map(),如下面的代码:有三个if语句用于输出一个Tuple2.否则将输出一个空数组[Tuple2]
你有更优雅的方式来实现这个功能吗?
rddx.flatMap { case (arr: Array[String]) =>
val url_parts = arr(1).split("/")
if (url_parts.length > 7) {
val pid = url_parts(4)
val lid = url_parts(7).split("_")
if (lid.length == 2) {
val sid = lid(0)
val eid = lid(1)
if (eid.length > 0 && eid(0) == "h") {
Array((pid, 1))
}
else new Array[(String, Int)](0)
}
else Array((pid, 1))
}
else new Array[(String, Int)](0)
}
Run Code Online (Sandbox Code Playgroud)
你可以用for-comprehension.当然,这将成为一个链flatMap,map,filter,但星火集团将那些在一个阶段,无论如何,所以不应该有任何性能损失.
for {
arr <- rddx
url_parts = arr(1).split("/")
if url_parts.length > 7
pid = url_parts(4)
lid = url_parts(7).split("_")
if lid.length == 2
sid = lid(0)
eid = lid(1)
if eid.length > 0 && eid(0) == "h"
} yield
Array((pid, 1))
Run Code Online (Sandbox Code Playgroud)
这是toDebugString显示只有一个阶段的输出
scala> res.toDebugString
res2: String =
(8) MapPartitionsRDD[7] at map at <console>:24 []
| MapPartitionsRDD[6] at filter at <console>:24 []
| MapPartitionsRDD[5] at map at <console>:24 []
| MapPartitionsRDD[4] at filter at <console>:24 []
| MapPartitionsRDD[3] at map at <console>:24 []
| MapPartitionsRDD[2] at filter at <console>:24 []
| MapPartitionsRDD[1] at map at <console>:24 []
| ParallelCollectionRDD[0] at parallelize at <console>:21 []
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1636 次 |
| 最近记录: |