Reactor取消订阅的方式

Kap*_*lny 3 java reactive-programming project-reactor

我试图找出Reactor Project,我正在寻找一种取消订阅的方法.我知道在进行例如Flux的订阅后,我可以获得可用于发送onCancel信号的Cancellation对象的引用,但这只是在订阅之后我需要在某种Collection中保存该引用.

有没有更好的方法来获得取消对象?或者只是取消订阅.也许某种地方包含对所有活跃订阅的引用 - 是的,这将是非常棒的......

Sim*_*slé 5

在Reactor中,Subscription在你调用之前想要取消a是没有意义的subscribe()(因为正是这种方法创建Subscription并传播信号链以启动数据发射).

所有订阅都没有集中的地方,这没有多大意义,因为您需要一种方法来查找要取消的特定订阅(并且请记住,链中的每个运营商也可以使用中间订阅...).

请注意,一些运营商也会代表您取消订阅!这是的情况下take(int),例如,这将取消上游一旦足够的项目已发出:

Flux.just(1, 2, 3, 4).log().take(2).subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

将输出:

14:17:48.729 [main] INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
14:17:48.732 [main] INFO  reactor.Flux.Array.1 - | request(unbounded)
14:17:48.732 [main] INFO  reactor.Flux.Array.1 - | onNext(1)
1
14:17:48.732 [main] INFO  reactor.Flux.Array.1 - | onNext(2)
2
14:17:48.732 [main] INFO  reactor.Flux.Array.1 - | cancel()
Run Code Online (Sandbox Code Playgroud)

  • 我建议您更喜欢使用 `Cancellation` 对象。请注意,它将在 3.1 中变成一个 `Disposable`(此时你必须调用 `dispose()` 而不是 `cancel()`)。或者研究可以自然地匹配您想要做的事情的运算符,并在需要时为您取消......将异常抛出到通量中听起来不是一个很好的解决方案,这取决于您的用例。 (2认同)