Ale*_*zel 4 spring-boot spring-websocket spring-webflux
New Spring在Spring文档中有一些WebSocketClient示例。
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute("ws://localhost:8080/echo"), session -> {... }).blockMillis(5000);
Run Code Online (Sandbox Code Playgroud)
但这很短,也不清楚:
有人可以提供更复杂的例子吗?
UPD。我试图做类似的事情:
public Flux<String> getStreaming() {
WebSocketClient client = new ReactorNettyWebSocketClient();
EmitterProcessor<String> output = EmitterProcessor.create();
Flux<String> input = Flux.just("{ event: 'subscribe', channel: 'examplpe' }");
Mono<Void> sessionMono = client.execute(URI.create("ws://api.example.com/"),
session -> session
.send(input.map(session::textMessage))
.thenMany(session.receive().map(WebSocketMessage::getPayloadAsText).subscribeWith(output).then())
.then());
return output.doOnSubscribe(s -> sessionMono.subscribe());
}
Run Code Online (Sandbox Code Playgroud)
但这仅返回一条消息。就像我没有订阅。
我假设您正在使用“回声”服务。为了从服务中获取一些消息,您必须将它们推入websocket,服务将“回显”它们给您。
在示例代码中,您仅向websocket写入了一个元素。一旦您将更多消息推入套接字,您将获得更多回报。
我修改了代码以连接到ws://echo.websocket.org本地服务。浏览到/stream每一秒时,都会显示一条新消息。
@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getStreaming() throws URISyntaxException {
Flux<String> input = Flux.<String>generate(sink -> sink.next(String.format("{ message: 'got message', date: '%s' }", new Date())))
.delayElements(Duration.ofSeconds(1));
WebSocketClient client = new ReactorNettyWebSocketClient();
EmitterProcessor<String> output = EmitterProcessor.create();
Mono<Void> sessionMono = client.execute(URI.create("ws://echo.websocket.org"), session -> session.send(input.map(session::textMessage))
.thenMany(session.receive().map(WebSocketMessage::getPayloadAsText).subscribeWith(output).then()).then());
return output.doOnSubscribe(s -> sessionMono.subscribe());
}
Run Code Online (Sandbox Code Playgroud)
希望这可以帮助...
| 归档时间: |
|
| 查看次数: |
1014 次 |
| 最近记录: |