Reactor StepVerifier.withVirtualTime 无限期阻塞

dea*_*mon 4 java project-reactor

我正在尝试使用 Reactor 的虚拟时间功能​​,但是测试无限期地阻塞(没有超时)或抛出一个AssertionError(有超时):

@Test
public void test() {
    StepVerifier.withVirtualTime(() -> 
            Flux.just(1, 2, 3, 4).delayElements(Duration.ofSeconds(1)))
            .expectSubscription()
            .expectNextCount(4)
            .expectComplete()
            .verify(Duration.ofSeconds(10));
}
Run Code Online (Sandbox Code Playgroud)

例外是:

java.lang.AssertionError: VerifySubscribertimed out on reactor.core.publisher.FluxConcatMap$ConcatMapImmediate@66d1af89
Run Code Online (Sandbox Code Playgroud)

与实时相同的示例按预期工作:

@Test
public void test2() {
    StepVerifier.create(Flux.just(1, 2, 3, 4).delayElements(Duration.ofSeconds(1)))
            .expectSubscription()
            .expectNextCount(4)
            .expectComplete()
            .verify(Duration.ofSeconds(10));
}
Run Code Online (Sandbox Code Playgroud)

在参考文献中的Manipulating Time 之后的第一个示例中,我看不到错误。

怎么了?

Sim*_*slé 10

您需要使用.thenAwait(Duration),否则(虚拟)时钟不会移动,并且延迟永远不会发生。您还可以使用.expectNoEvent(Duration) expectSubscription()

例如:

@Test
public void test() {
  StepVerifier.withVirtualTime(() -> 
        Flux.just(1, 2, 3, 4).delayElements(Duration.ofSeconds(1)))
        .expectSubscription() //t == 0
//move the clock forward by 1s, and check nothing is emitted in the meantime
        .expectNoEvent(Duration.ofSeconds(1))
//so this effectively verifies the first value is delayed by 1s:
        .expectNext(1)
//and so on...
        .expectNoEvent(Duration.ofSeconds(1))
        .expectNext(2)
//or move the clock forward by 2s, allowing events to take place,
//and check last 2 values where delayed
        .thenAwait(Duration.ofSeconds(2))
        .expectNext(3, 4)
        .expectComplete()
//trigger the verification and check that in realtime it ran in under 200ms
        .verify(Duration.ofMillis(200));
}
Run Code Online (Sandbox Code Playgroud)