meg*_*off 5 java spring-websocket spring-webflux
在 Reactor 3.4.0 中,不同的 FluxProcessor(例如“DirectProcessor”)已被弃用。我使用这样的处理器作为订阅者,请参见下面的示例。
现在我想知道如何迁移我的代码才能使用推荐的Sinks.many()方法?有任何想法吗?
旧代码:
DirectProcessor<String> output = DirectProcessor.create();
output.subscribe(msg -> System.out.println(msg));
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(uri, session ->
// send message
session.send(Mono.just(session.textMessage(command)))
.thenMany(session.receive()
.map(message -> message.getPayloadAsText())
.subscribeWith(output))
.then()).block();
Run Code Online (Sandbox Code Playgroud)
根据已弃用的 DirectProcessor 的 JavaDoc,我应该使用Sinks.many().multicast().directBestEffort(). 但我想知道如何在我的 WebSocketClient 中使用它?
迁移代码:
Many<String> sink = Sinks.many().multicast().directBestEffort();
Flux<String> flux = sink.asFlux();
flux.subscribe(msg -> System.out.println(msg));
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(uri, session ->
// send message
session.send(Mono.just(session.textMessage(command)))
.thenMany(session.receive()
.map(message -> message.getPayloadAsText())
.subscribe ... // <-- how to do this with a Sink ??
.then()).block();
Run Code Online (Sandbox Code Playgroud)
感谢您提前提出任何建议。