WebFlux 和 Reactor 3.4.0 - 已弃用的 FluxProcessors - 如何使用接收器订阅?

meg*_*off 5 java spring-websocket spring-webflux

在 Reactor 3.4.0 中,不同的 FluxProcessor(例如“DirectProcessor”)已被弃用。我使用这样的处理器作为订阅者,请参见下面的示例。

现在我想知道如何迁移我的代码才能使用推荐的Sinks.many()方法?有任何想法吗?

旧代码:

DirectProcessor<String> output = DirectProcessor.create();
output.subscribe(msg -> System.out.println(msg));

WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(uri, session -> 
    // send message
    session.send(Mono.just(session.textMessage(command)))
        .thenMany(session.receive()
        .map(message -> message.getPayloadAsText())
        .subscribeWith(output))
    .then()).block();
Run Code Online (Sandbox Code Playgroud)

根据已弃用的 DirectProcessor 的 JavaDoc,我应该使用Sinks.many().multicast().directBestEffort(). 但我想知道如何在我的 WebSocketClient 中使用它?

迁移代码:

Many<String> sink = Sinks.many().multicast().directBestEffort();        
Flux<String> flux = sink.asFlux();
flux.subscribe(msg -> System.out.println(msg));

WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(uri, session -> 
    // send message
    session.send(Mono.just(session.textMessage(command)))
        .thenMany(session.receive()
        .map(message -> message.getPayloadAsText())
        .subscribe ...   // <-- how to do this with a Sink ??
    .then()).block();

Run Code Online (Sandbox Code Playgroud)

感谢您提前提出任何建议。