Spark Streaming空RDD性能问题

Seb*_*nst 5 scala apache-spark spark-streaming

我使用Spark Streaming来处理导入的数据.导入数据存储在a中DStream.此外,我有类CreationUpdate持有一个Foo对象.

我想要完成的任务之一是变更检测.所以我加入2个rdds(一个持有处理的数据批,另一个持有当前状态)currentState最初是空的.

val stream: DStream[Foo]
val currentState: RDD[Foo]

val changes = stream
    .transform { batch =>
        batch.leftouterJoin(currentState) {
            case(objectNew, Some(objectOld)) => Update(objectNew)
            case(objectNew, None) => Creation(objectNew)
        }
    }

currentState = currentState.fullOuterJoin(changes).map {
        case (Some(foo), None) => foo
        case (_, Some(change)) => change.foo
    }
}.cache()
Run Code Online (Sandbox Code Playgroud)

之后我过滤掉了更新.

changes.filter(!_.isInstanceOf[Update])
Run Code Online (Sandbox Code Playgroud)

我现在导入两次相同的数据.由于状态最初为空,因此第一个导入的结果集由Creation对象组成,而第二个导入的结果集仅导致Update对象.所以第二个结果集changes是空的.在这种情况下,我注意到性能大幅下降.如果我省略过滤器,它工作正常.

我无法想象这是预期的行为,但也许这是Spark Computation Internals的一个问题.谁能解释为什么会这样?