有人可以告诉我或给出一个使用WebFlux、RScoket 和 Spring(或 SpringBoot)的现成 CRUD 示例吗?
我研究了RSocket文档WebFlux,也写了我的简单示例,但我希望看到使用RSocket 的基本方法的真实CRUD应用程序。
我会很感激的。谢谢。
我在通过 TCP 连接到 Spring Boot RSocket 应用程序时遇到问题。使用 RSocketRequester 时的客户端工作正常,但是当我尝试使用 RSocketFactory 客户端连接时,它不断出错。代码如下。
RSocket rSocket = this.client = RSocketFactory
.connect()
.mimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.toString(), MediaType.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.block();
Flux<Payload> s = rSocket.requestStream(DefaultPayload.create("1234", "socket"));
s.subscribe();
Run Code Online (Sandbox Code Playgroud)
这给出了如下错误:
java.lang.IndexOutOfBoundsException: readerIndex(1) + length(115) exceeds writerIndex(6): AbstractPooledDerivedByteBuf$PooledNonRetainedSlicedByteBuf(ridx: 1, widx: 6, cap: 6/6, unwrapped: PooledUnsafeDirectByteBuf(ridx: 27, widx: 27, cap: 1024))
at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1477)
at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1463)
at io.netty.buffer.AbstractByteBuf.readSlice(AbstractByteBuf.java:880)
at io.rsocket.metadata.TaggingMetadata$1.next(TaggingMetadata.java:47)
at io.rsocket.metadata.TaggingMetadata$1.next(TaggingMetadata.java:37)
at org.springframework.messaging.rsocket.DefaultMetadataExtractor.extractEntry(DefaultMetadataExtractor.java:136)
at org.springframework.messaging.rsocket.DefaultMetadataExtractor.extract(DefaultMetadataExtractor.java:119)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.createHeaders(MessagingRSocket.java:195)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.handleAndReply(MessagingRSocket.java:167)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.requestStream(MessagingRSocket.java:127)
at io.rsocket.RSocketResponder.requestStream(RSocketResponder.java:207)
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:310)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
at …Run Code Online (Sandbox Code Playgroud) 我已阅读stackoverflow 中的相关问题。但是,答案仅告诉如何配置身份验证检查。我需要在 Spring Boot 中使用 rsocket 加密所有传输的数据。如何在带有 SSL/TLS 的 Spring Boot 中使用 tls。当我初始化 rsocket 客户端时,我找不到任何支持的方法,如下所示,尽管我知道 rsocket 本身可以支持 SSL/TLS。我找到了一些例子:例子
this.rsocketRequester = rsocketRequesterBuilder.setupRoute("sidecar-client")
.setupData("test_data")
.rsocketConnector(connector -> connector.acceptor(responder))
.connectTcp("localhost", 7000)
.block();
Run Code Online (Sandbox Code Playgroud)
Summary of my question:
1. Does rsocket in springboot support SSL/TLS ?
2. If spring boot support rsocket with SSL/TLS, are there any examples can be referenced
Run Code Online (Sandbox Code Playgroud) 我试图了解LoadbalanceRSocketClient SpringBoot 应用程序 ( RSocketRequester)上下文中的正确配置和使用模式。
我有两个 RSocket 服务器后端(SpringBoot、RSocket 消息传递)RSocketRequester在客户端上运行和配置,如下所示:
List<LoadbalanceTarget> servers = new ArrayList<>();
for (String url: backendUrls) {
HttpClient httpClient = HttpClient.create()
.baseUrl(url)
.secure(ssl ->
ssl.sslContext(SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)));
servers.add(LoadbalanceTarget.from(url, WebsocketClientTransport.create(httpClient, url)));
}
// RSocketRequester.Builder is autowired by Spring boot
RSocketRequester requester = builder
.setupRoute("/connect")
.setupData("test")
//.rsocketConnector(connector -> connector.reconnect(Retry.fixedDelay(60, Duration.ofSeconds(1))))
.transports(Flux.just(servers), new RoundRobinLoadbalanceStrategy());
Run Code Online (Sandbox Code Playgroud)
配置完成后,请求者将在计时器循环中重复使用,如下所示:
@Scheduled(fixedDelay = 10000, initialDelay = 1000)
public void timer() {
requester.route("/foo").data(Data).send().block();
}
Run Code Online (Sandbox Code Playgroud)
它工作 - 客户端启动,连接到其中一台服务器并将消息推送到它。如果我终止客户端连接的服务器,客户端会在下一个计时器事件中重新连接到另一台服务器。如果我再次启动第一个服务器并杀死第二个服务器,客户端将不再连接,并且在客户端观察到以下异常:
java.util.concurrent.CancellationException: Pool is exhausted
at io.rsocket.loadbalance.RSocketPool.select(RSocketPool.java:202) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.loadbalance.LoadbalanceRSocketClient.lambda$fireAndForget$0(LoadbalanceRSocketClient.java:49) …Run Code Online (Sandbox Code Playgroud)