是否有更优雅的方式来实现滤镜+地图火花功能

use*_*932 1 scala apache-spark

我想使用flatMap来实现filter()+ map(),如下面的代码:有三个if语句用于输出一个Tuple2.否则将输出一个空数组[Tuple2]

你有更优雅的方式来实现这个功能吗?

 rddx.flatMap { case (arr: Array[String]) =>
          val url_parts = arr(1).split("/")
          if (url_parts.length > 7) {
            val pid = url_parts(4)
            val lid = url_parts(7).split("_")
            if (lid.length == 2) {
              val sid = lid(0)
              val eid = lid(1)
              if (eid.length > 0 && eid(0) == "h") {
                Array((pid, 1))
              }
              else new Array[(String, Int)](0)
            }
            else Array((pid, 1))
          }
          else new Array[(String, Int)](0)
         }
Run Code Online (Sandbox Code Playgroud)

Iul*_*gos 5

你可以用for-comprehension.当然,这将成为一个链flatMap,map,filter,但星火集团将那些在一个阶段,无论如何,所以不应该有任何性能损失.

for {
  arr <- rddx
  url_parts = arr(1).split("/")
  if url_parts.length > 7
  pid = url_parts(4)
  lid = url_parts(7).split("_")
  if lid.length == 2
  sid = lid(0)
  eid = lid(1)
  if eid.length > 0 && eid(0) == "h"
} yield 
  Array((pid, 1))
Run Code Online (Sandbox Code Playgroud)

这是toDebugString显示只有一个阶段的输出

scala> res.toDebugString
res2: String = 
(8) MapPartitionsRDD[7] at map at <console>:24 []
 |  MapPartitionsRDD[6] at filter at <console>:24 []
 |  MapPartitionsRDD[5] at map at <console>:24 []
 |  MapPartitionsRDD[4] at filter at <console>:24 []
 |  MapPartitionsRDD[3] at map at <console>:24 []
 |  MapPartitionsRDD[2] at filter at <console>:24 []
 |  MapPartitionsRDD[1] at map at <console>:24 []
 |  ParallelCollectionRDD[0] at parallelize at <console>:21 []
Run Code Online (Sandbox Code Playgroud)