Seb*_*nst 5 scala apache-spark spark-streaming
我使用Spark Streaming来处理导入的数据.导入数据存储在a中DStream.此外,我有类Creation和Update持有一个Foo对象.
我想要完成的任务之一是变更检测.所以我加入2个rdds(一个持有处理的数据批,另一个持有当前状态)currentState最初是空的.
val stream: DStream[Foo]
val currentState: RDD[Foo]
val changes = stream
.transform { batch =>
batch.leftouterJoin(currentState) {
case(objectNew, Some(objectOld)) => Update(objectNew)
case(objectNew, None) => Creation(objectNew)
}
}
currentState = currentState.fullOuterJoin(changes).map {
case (Some(foo), None) => foo
case (_, Some(change)) => change.foo
}
}.cache()
Run Code Online (Sandbox Code Playgroud)
之后我过滤掉了更新.
changes.filter(!_.isInstanceOf[Update])
Run Code Online (Sandbox Code Playgroud)
我现在导入两次相同的数据.由于状态最初为空,因此第一个导入的结果集由Creation对象组成,而第二个导入的结果集仅导致Update对象.所以第二个结果集changes是空的.在这种情况下,我注意到性能大幅下降.如果我省略过滤器,它工作正常.
我无法想象这是预期的行为,但也许这是Spark Computation Internals的一个问题.谁能解释为什么会这样?
| 归档时间: |
|
| 查看次数: |
456 次 |
| 最近记录: |