如何取消正在进行的Spring Flux?

kms*_*333 5 spring reactive-programming project-reactor spring-webflux

我正在使用spring flux向服务发送并行请求,这是非常简化的版本:

Flux.fromIterable(customers)
  .flatMap { customer ->
     client.call(customer)
  } ...
Run Code Online (Sandbox Code Playgroud)

我想知道如何取消这种通量,就像在某种程度上获取对通量的引用并告诉它关闭一样.

pvp*_*ran 9

@Edwin 的回答很准确。只要不调用 subscribe,就没有什么可以取消的,因为不会执行任何代码。
只是想添加一个例子来说明这一点。

public static void main(String[] args) throws InterruptedException {

   List<String> lists = Lists.newArrayList("abc", "def", "ghi");
   Disposable disposable = Flux.fromIterable(lists)
                           .delayElements(Duration.ofSeconds(3))
                           .map(String::toLowerCase)
                           .subscribe(System.out::println);

   Thread.sleep(5000); //Sleeping so that some elements in the flux gets printed
   disposable.dispose();

   Thread.sleep(10000); // Sleeping so that we can prove even waiting for some time nothing gets printed after cancelling the flux
}
Run Code Online (Sandbox Code Playgroud)

但我想说一种更简洁的方法(函数式方法)是使用像takeUntilor 这样的函数take。例如,我也可以像这样停止上面示例中的流。

List<String> lists = Lists.newArrayList("abc", "def", "End", "ghi");
Flux.fromIterable(lists).takeUntil(s -> s.equalsIgnoreCase("End"))
                           .delayElements(Duration.ofSeconds(3))
                           .map(String::toLowerCase)
                           .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

或者

List<String> lists = Lists.newArrayList("abc", "def", "ghi");
Flux.fromIterable(lists).take(2)
                           .delayElements(Duration.ofSeconds(2))
                           .map(String::toLowerCase)
                           .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)


Edw*_*rzo 7

您可能知道,对于被动对象,所有操作员都很懒惰.这意味着管道的执行会延迟到您订阅反应流的那一刻.

所以,在你的例子中,没有什么可以取消,因为那时没有发生任何事情.

但假设您的示例扩展到:

Disposable disp = Flux.fromIterable(customers)
  .flatMap { customer ->
     client.call(customer)
  }
  .subscribe();
Run Code Online (Sandbox Code Playgroud)

然后,你可以看到,您的订阅返回一个Disposable对象,如果你想,你可以用它来取消整个事情,如

disp.dispose()
Run Code Online (Sandbox Code Playgroud)

处置文件说:

取消或处置基础任务或资源.

文档另一部分说明如下:

这些[运算符]变体返回对订阅的引用,当不再需要数据时,可以使用该引用来取消订阅.取消后,源应停止生成值并清除它创建的任何资源.这种取消和清除行为在Reactor中由通用Disposable接口表示.

因此,取消流的执行并非在反应对象方面没有复杂性,因为如果在处理过程中取消流,则需要确保让世界保持一致状态.例如,如果您正在构建某些内容,则可能需要丢弃资源,销毁任何部分聚合结果,关闭文件,通道,释放内存或您拥有的任何其他资源,可能会撤消更改或对其进行补偿.

您可能希望阅读关于此的清理文档,以便您还考虑在反应对象方面可以执行的操作.

Flux<String> bridge = Flux.create(sink -> {
    sink.onRequest(n -> channel.poll(n))
        .onCancel(() -> channel.cancel()) 
        .onDispose(() -> channel.close())  
    });
Run Code Online (Sandbox Code Playgroud)