Apache Spark:处理RDD中的Option/Some/None

Ken*_*ams 5 scala exception apache-spark scala-option

我正在映射HBase表,每个HBase行生成一个RDD元素.但是,有时行有坏数据(在解析代码中抛出NullPointerException),在这种情况下我只想跳过它.

我有我的初始映射器返回一个Option表示它返回0或1个元素,然后筛选Some,然后获取包含的值:

// myRDD is RDD[(ImmutableBytesWritable, Result)]
val output = myRDD.
  map( tuple => getData(tuple._2) ).
  filter( {case Some(y) => true; case None => false} ).
  map( _.get ).
  // ... more RDD operations with the good data

def getData(r: Result) = {
  val key = r.getRow
  var id = "(unk)"
  var x = -1L

  try {
    id = Bytes.toString(key, 0, 11)
    x = Long.MaxValue - Bytes.toLong(key, 11)
    // ... more code that might throw exceptions

    Some( ( id, ( List(x),
          // more stuff ...
        ) ) )
  } catch {
    case e: NullPointerException => {
      logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e)
      None
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

有没有更惯用的方法来做更短的事情?我觉得这看起来很混乱,无论是getData()map.filter.map舞蹈中还是在舞蹈中我都在做.

也许一个flatMap可以工作(在a中生成0或1个项目Seq),但我不希望它在地图函数中展平我正在创建的元组,只是消除空白.

maa*_*asg 8

一种替代的,经常被忽略的方式是使用collect(PartialFunction pf),这意味着"选择"或"收集"在部分功能中定义的RDD中的特定元素.

代码如下所示:

val output = myRDD.collect{case Success(tuple) => tuple }

def getData(r: Result):Try[(String, List[X])] = Try {
        val id = Bytes.toString(key, 0, 11)
        val x = Long.MaxValue - Bytes.toLong(key, 11)
        (id, List(x))
}
Run Code Online (Sandbox Code Playgroud)

  • @JustinPihony没有洗牌。`collect`可以表示为:`collection.filter(x => pf.isDefinedAt(x))。map(pf)` (2认同)

cmb*_*ter 7

如果您将其更改getData为返回a,scala.util.Try那么您可以大大简化转换.像这样的东西可以工作:

def getData(r: Result) = {
  val key = r.getRow
  var id = "(unk)"
  var x = -1L

  val tr = util.Try{
    id = Bytes.toString(key, 0, 11)
    x = Long.MaxValue - Bytes.toLong(key, 11)
    // ... more code that might throw exceptions

    ( id, ( List(x)
          // more stuff ...
     ) )
  } 

  tr.failed.foreach(e => logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e))
  tr
}
Run Code Online (Sandbox Code Playgroud)

然后你的变换可以这样开始:

myRDD.
  flatMap(tuple => getData(tuple._2).toOption)
Run Code Online (Sandbox Code Playgroud)

如果你Try是a Failure,它将被转换成一个Nonevia toOption,然后作为flatMap逻辑的一部分被删除.此时,转换中的下一步只会处理成功的情况,getData即无需包装返回的基础类型(即否Option)