Spark mapWithState API说明

Dan*_*nai 8 scala apache-spark spark-streaming

我一直mapWithState在Spark Streaming中使用API,但有两点不清楚StateSpec.function:

假设我的功能是:

def trackStateForKey(batchTime: Time,
                     key: Long,
                     newValue: Option[JobData],
                     currentState: State[JobData]): Option[(Long, JobData)]
Run Code Online (Sandbox Code Playgroud)
  1. 为什么新值是一种Option[T]类型?据我所知,它总是为我定义,并且因为该方法应该以新状态调用,所以我真的没有看到为什么它可以是可选的.

  2. 返回值是什么意思?我试图在文档和源代码中找到一些指针,但没有一个描述它的用途.由于我正在使用state.remove()和修改密钥的状态state.update(),为什么我必须对返回值执行相同的操作?

    在我当前的实现中,None如果我删除密钥,我将返回,Some(newState)如果我更新它,但我不确定这是否正确.

Yuv*_*kov 18

为什么新值是一种Option[T]类型?据我所知,它总是为我定义,并且因为该方法应该以新状态调用,所以我真的没有看到为什么它可以是可选的.

这是Option[T]因为如果使用设置超时StateSpec.timeout,例如:

StateSpec.function(spec _).timeout(Milliseconds(5000))
Run Code Online (Sandbox Code Playgroud)

然后,一旦函数超时,传入的值将是None,并且isTimingOut方法on State[T]将产生true.这是有道理的,因为国家的超时并不意味着一个新的值已经到达了指定键,一般较安全传递使用nullT(这不会对工作的原语反正)如你所期望的用户安全在...上运作Option[T].

您可以在Sparks实现中看到:

// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
  newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
    wrappedState.wrapTimingOutState(state)
    val returned = mappingFunction(batchTime, key, None, wrappedState) // <-- This.
    mappedData ++= returned
    newStateMap.remove(key)
  }
}
Run Code Online (Sandbox Code Playgroud)

返回值是什么意思?我试图在文档和源代码中找到一些指针,但没有一个描述它的用途.由于我使用state.remove()和state.update()修改键的状态,为什么我必须对返回值执行相同的操作?

返回值是一种沿着spark图传递中间状态的方法.例如,假设我想更新我的状态,但也在我的管道中使用中间数据执行某些操作,例如:

dStream
  .mapWithState(stateSpec)
  .map(optionIntermediateResult.map(_ * 2))
  .foreachRDD( /* other stuff */)
Run Code Online (Sandbox Code Playgroud)

该返回值正是允许我继续操作所述数据的原因.如果您不关心中间结果并且只想要完整状态,那么输出None就完全没问题了.

编辑:

我写了一篇博文(关注这个问题),试图对API进行深入的解释.