kms*_*333 5 spring reactive-programming project-reactor spring-webflux
我正在使用spring flux向服务发送并行请求,这是非常简化的版本:
Flux.fromIterable(customers)
.flatMap { customer ->
client.call(customer)
} ...
Run Code Online (Sandbox Code Playgroud)
我想知道如何取消这种通量,就像在某种程度上获取对通量的引用并告诉它关闭一样.
@Edwin 的回答很准确。只要不调用 subscribe,就没有什么可以取消的,因为不会执行任何代码。
只是想添加一个例子来说明这一点。
public static void main(String[] args) throws InterruptedException {
List<String> lists = Lists.newArrayList("abc", "def", "ghi");
Disposable disposable = Flux.fromIterable(lists)
.delayElements(Duration.ofSeconds(3))
.map(String::toLowerCase)
.subscribe(System.out::println);
Thread.sleep(5000); //Sleeping so that some elements in the flux gets printed
disposable.dispose();
Thread.sleep(10000); // Sleeping so that we can prove even waiting for some time nothing gets printed after cancelling the flux
}
Run Code Online (Sandbox Code Playgroud)
但我想说一种更简洁的方法(函数式方法)是使用像takeUntilor 这样的函数take。例如,我也可以像这样停止上面示例中的流。
List<String> lists = Lists.newArrayList("abc", "def", "End", "ghi");
Flux.fromIterable(lists).takeUntil(s -> s.equalsIgnoreCase("End"))
.delayElements(Duration.ofSeconds(3))
.map(String::toLowerCase)
.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
或者
List<String> lists = Lists.newArrayList("abc", "def", "ghi");
Flux.fromIterable(lists).take(2)
.delayElements(Duration.ofSeconds(2))
.map(String::toLowerCase)
.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
您可能知道,对于被动对象,所有操作员都很懒惰.这意味着管道的执行会延迟到您订阅反应流的那一刻.
所以,在你的例子中,没有什么可以取消,因为那时没有发生任何事情.
但假设您的示例扩展到:
Disposable disp = Flux.fromIterable(customers)
.flatMap { customer ->
client.call(customer)
}
.subscribe();
Run Code Online (Sandbox Code Playgroud)
然后,你可以看到,您的订阅返回一个Disposable对象,如果你想,你可以用它来取消整个事情,如
disp.dispose()
Run Code Online (Sandbox Code Playgroud)
处置文件说:
取消或处置基础任务或资源.
这些[运算符]变体返回对订阅的引用,当不再需要数据时,可以使用该引用来取消订阅.取消后,源应停止生成值并清除它创建的任何资源.这种取消和清除行为在Reactor中由通用
Disposable接口表示.
因此,取消流的执行并非在反应对象方面没有复杂性,因为如果在处理过程中取消流,则需要确保让世界保持一致状态.例如,如果您正在构建某些内容,则可能需要丢弃资源,销毁任何部分聚合结果,关闭文件,通道,释放内存或您拥有的任何其他资源,可能会撤消更改或对其进行补偿.
您可能希望阅读关于此的清理文档,以便您还考虑在反应对象方面可以执行的操作.
Flux<String> bridge = Flux.create(sink -> {
sink.onRequest(n -> channel.poll(n))
.onCancel(() -> channel.cancel())
.onDispose(() -> channel.close())
});
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2933 次 |
| 最近记录: |