Spring WebFlux(Flux):如何动态发布

Joh*_*ang 9 spring-webflux

我是Reactive programming和Spring WebFlux的新手.我想让我的App 1通过Flux发布Server Sent事件,我的App 2继续收听它.

我希望Flux按需发布(例如,当发生某些事情时).我发现的所有示例都是使用Flux.interval来定期发布事件,并且一旦创建了Flux中的内容,似乎无法附加/修改内容.

我怎样才能实现目标?或者我在概念上完全错了.

Ole*_*uka 28

使用FluxProcessor和"动态"发布FluxSink

手动提供数据的技术之一Flux是使用FluxProcessor#sink方法,如以下示例所示

@SpringBootApplication
@RestController
public class DemoApplication {

    final FluxProcessor processor;
    final FluxSink sink;
    final AtomicLong counter;

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);

    }

    public DemoApplication() {
        this.processor = DirectProcessor.create().serialize();
        this.sink = processor.sink();
        this.counter = new AtomicLong();
    }

    @GetMapping("/send")
    public void test() {
        sink.next("Hello World #" + counter.getAndIncrement());
    }

    @RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent> sse() {
        return processor.map(e -> ServerSentEvent.builder(e).build());
    }
}
Run Code Online (Sandbox Code Playgroud)

在这里,我创建DirectProcessor了以支持多个订阅者,这将监听数据流.此外,我提供了额外的功能FluxProcessor#serialize,为多产品提供安全支持(从不同线程调用而不违反Reactive Streams规范规则,尤其是规则1.3).最后,通过调用" http:// localhost:8080/send ",我们将看到该消息Hello World #1(当然,仅在以前连接到" http:// localhost:8080 "的情况下)

  • 很有意思。您是否还记得您从哪里了解到这些事情的?从未见过这种实现方式。 (2认同)

小智 5

只是另一个想法,使用 EmitterProcessor 作为通量的网关

    import reactor.core.publisher.EmitterProcessor;
    import reactor.core.publisher.Flux;

    public class MyEmitterProcessor {
        EmitterProcessor<String> emitterProcessor;

        public static void main(String args[]) {

            MyEmitterProcessor myEmitterProcessor = new MyEmitterProcessor();
            Flux<String> publisher = myEmitterProcessor.getPublisher();
            myEmitterProcessor.onNext("A");
            myEmitterProcessor.onNext("B");
            myEmitterProcessor.onNext("C");
            myEmitterProcessor.complete();

            publisher.subscribe(x -> System.out.println(x));

        }

        public Flux<String> getPublisher() {
            emitterProcessor = EmitterProcessor.create();
            return emitterProcessor.map(x -> "consume: " + x);
        } 

        public  void onNext(String nextString) {
            emitterProcessor.onNext(nextString);
        }

        public  void complete() {
            emitterProcessor.onComplete();
        }
    }
Run Code Online (Sandbox Code Playgroud)

更多信息,请参阅此处的 Reactor 文档。文档本身有一条建议:“大多数时候,您应该尽量避免使用处理器。它们更难正确使用,并且容易出现一些极端情况。” 但我不知道哪种极端情况。

  • 同时,`EmitterProcessor` 类已被标记为已弃用,并将在 3.5 版本中删除。作为替代解决方案,建议使用“Sinks.many().multicast().onBackPressureBuffer()”(如该问题的第一个答案中所建议的)。 (5认同)