Dan*_*nai 8 scala apache-spark spark-streaming
我一直mapWithState在Spark Streaming中使用API,但有两点不清楚StateSpec.function:
假设我的功能是:
def trackStateForKey(batchTime: Time,
key: Long,
newValue: Option[JobData],
currentState: State[JobData]): Option[(Long, JobData)]
Run Code Online (Sandbox Code Playgroud)
为什么新值是一种Option[T]类型?据我所知,它总是为我定义,并且因为该方法应该以新状态调用,所以我真的没有看到为什么它可以是可选的.
返回值是什么意思?我试图在文档和源代码中找到一些指针,但没有一个描述它的用途.由于我正在使用state.remove()和修改密钥的状态state.update(),为什么我必须对返回值执行相同的操作?
在我当前的实现中,None如果我删除密钥,我将返回,Some(newState)如果我更新它,但我不确定这是否正确.
Yuv*_*kov 18
为什么新值是一种
Option[T]类型?据我所知,它总是为我定义,并且因为该方法应该以新状态调用,所以我真的没有看到为什么它可以是可选的.
这是Option[T]因为如果使用设置超时StateSpec.timeout,例如:
StateSpec.function(spec _).timeout(Milliseconds(5000))
Run Code Online (Sandbox Code Playgroud)
然后,一旦函数超时,传入的值将是None,并且isTimingOut方法on State[T]将产生true.这是有道理的,因为国家的超时并不意味着一个新的值已经到达了指定键,一般较安全传递使用null的T(这不会对工作的原语反正)如你所期望的用户安全在...上运作Option[T].
您可以在Sparks实现中看到:
// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
wrappedState.wrapTimingOutState(state)
val returned = mappingFunction(batchTime, key, None, wrappedState) // <-- This.
mappedData ++= returned
newStateMap.remove(key)
}
}
Run Code Online (Sandbox Code Playgroud)
返回值是什么意思?我试图在文档和源代码中找到一些指针,但没有一个描述它的用途.由于我使用state.remove()和state.update()修改键的状态,为什么我必须对返回值执行相同的操作?
返回值是一种沿着spark图传递中间状态的方法.例如,假设我想更新我的状态,但也在我的管道中使用中间数据执行某些操作,例如:
dStream
.mapWithState(stateSpec)
.map(optionIntermediateResult.map(_ * 2))
.foreachRDD( /* other stuff */)
Run Code Online (Sandbox Code Playgroud)
该返回值正是允许我继续操作所述数据的原因.如果您不关心中间结果并且只想要完整状态,那么输出None就完全没问题了.
我写了一篇博文(关注这个问题),试图对API进行深入的解释.