Spring 5 FluxSink在执行fluxSink.next时不发送数据

Rod*_*rta 5 java project-reactor reactive spring-webflux

我有一个GET方法的例子,它生成一个Randon INT并发送回客户端

@GetMapping
public Flux<String> search() {
    return Flux.create(fluxSink -> {

            Random r = new Random();
            int n;
            for (int i = 0; i < 10; i++) {
                n= r.nextInt(1000);
                System.out.println("Creating:"+n);
                fluxSink.next(String.valueOf(n));

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fluxSink.complete();


    });
}
Run Code Online (Sandbox Code Playgroud)

卷曲:

curl -v -H"Accept:text/event-stream"-X GET'http :// localhost:8080/stream

使用CURL命令使用此代码,我只能在fluxSink.complete发生时查看客户端中的数字.

现在如果我将我的代码更改为:

@GetMapping
public Flux<String> search() {
    return Flux.create(fluxSink -> {
        Thread t = new Thread(() -> {
            Random r = new Random();
            int n;
            for (int i = 0; i < 10; i++) {
                n= r.nextInt(1000);
                System.out.println("Creating:"+n);
                fluxSink.next(String.valueOf(n));

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fluxSink.complete();
        });
        t.start();
    });
}
Run Code Online (Sandbox Code Playgroud)

将进程包装到Thread中它可以正常工作.当fluxSink.next发生时,我可以看到数据传输正常.

谁能解释这个效果?如何在不明确使用线程的情况下查看数据流?

谢谢!

Bri*_*zel 2

Flux.create()是关于“通过 FluxSink API 以同步或异步方式发出多个元素”。在您的情况下,您同步发出所有元素,并且您没有机会让 Reactor 工作,因为在一切完成之前您永远不会退出该方法。简而言之,您正在阻塞当前线程,这是 Reactor 所禁止的。

使用线程为 Reactor 提供了发送这些信号的机会。但其精神Flux.create()(如其 javadoc 所示)是关于适应回调。

在这种情况下,也许Flux.generate会更合适。尝试用 来模拟延迟Thread并不是一个好主意。你可以这样做:

Flux<String> response = Flux.just(aListOfElements).delayElements(Duration.ofSeconds(2));
Run Code Online (Sandbox Code Playgroud)