Law*_*son 15 scala apache-spark spark-streaming
我正在开发一个Scala(2.11)/ Spark(1.6.1)流式项目,mapWithState()用于跟踪以前批次中看到的数据.
状态分布在多个节点上的20个分区中,使用StateSpec.function(trackStateFunc _).numPartitions(20).在这种状态下,我们只有几个键(~100)映射到Sets最多约160,000个条目,这些条目在整个应用程序中增长.整个状态最多3GB,可以由群集中的每个节点处理.在每个批次中,一些数据被添加到一个状态,但直到过程结束时才被删除,即约15分钟.
在遵循应用程序UI时,与其他批次相比,每10个批次的处理时间非常长.看图像:
黄色字段代表高处理时间.
更详细的作业视图显示,在这些批次中发生在某一点,恰好是"跳过"所有20个分区.或者这就是UI所说的.
我的理解skipped是每个状态分区是一个可能的任务,没有被执行,因为它不需要重新计算.但是,我不明白为什么skips每个工作的数量变化以及为什么最后的工作需要如此多的处理.无论状态大小如何,都会出现更高的处理时间,它只会影响持续时间.
这是mapWithState()功能中的错误还是这个预期的行为?底层数据结构是否需要某种重新洗牌,Set状态是否需要复制数据?或者它更可能是我的应用程序中的缺陷?
Yuv*_*kov 10
这是mapWithState()功能中的错误还是这个预期的行为?
这是预期的行为.您看到的峰值是因为您的数据在给定批次结束时被检查点.如果您注意到较长批次的时间,您会发现它每100秒持续发生一次.这是因为检查点时间是恒定的,并且是根据您的计算得出的batchDuration,除非您明确设置DStream.checkpoint间隔,否则这是您与数据源进行通信以读取批次乘以某个常量的频率.
以下是相关的代码MapWithStateDStream:
override def initialize(time: Time): Unit = {
if (checkpointDuration == null) {
checkpointDuration = slideDuration * DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
}
super.initialize(time)
}
Run Code Online (Sandbox Code Playgroud)
在哪里DEFAULT_CHECKPOINT_DURATION_MULTIPLIER:
private[streaming] object InternalMapWithStateDStream {
private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10
}
Run Code Online (Sandbox Code Playgroud)
由于您的批次持续时间是每10秒=> 10*10 = 100秒,因此与您所看到的行为完全一致.
这是正常的,这就是使用Spark保持状态的成本.您的优化可能是考虑如何最小化您必须保留在内存中的状态的大小,以便尽可能快地进行此序列化.另外,确保数据遍布足够的执行程序,以便状态在所有节点之间均匀分布.此外,我希望你已经开启了Kryo Serialization而不是默认的Java序列化,这可以为你带来有意义的性能提升.