Kap*_*lny 3 java reactive-programming project-reactor
我试图找出Reactor Project,我正在寻找一种取消订阅的方法.我知道在进行例如Flux的订阅后,我可以获得可用于发送onCancel信号的Cancellation对象的引用,但这只是在订阅之后我需要在某种Collection中保存该引用.
有没有更好的方法来获得取消对象?或者只是取消订阅.也许某种地方包含对所有活跃订阅的引用 - 是的,这将是非常棒的......
在Reactor中,Subscription在你调用之前想要取消a是没有意义的subscribe()(因为正是这种方法创建Subscription并传播信号链以启动数据发射).
所有订阅都没有集中的地方,这没有多大意义,因为您需要一种方法来查找要取消的特定订阅(并且请记住,链中的每个运营商也可以使用中间订阅...).
请注意,一些运营商也会代表您取消订阅!这是的情况下take(int),例如,这将取消上游一旦足够的项目已发出:
Flux.just(1, 2, 3, 4).log().take(2).subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
将输出:
14:17:48.729 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | onNext(1)
1
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | onNext(2)
2
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | cancel()
Run Code Online (Sandbox Code Playgroud)