标签: rsocket-java

WebFlux + RSocket + Spring

有人可以告诉我或给出一个使用WebFlux、RScoket 和 Spring(或 SpringBoot)的现成 CRUD 示例吗?

我研究了RSocket文档WebFlux,也写了我的简单示例,但我希望看到使用RSocket 的基本方法的真实CRUD应用程序。

我会很感激的。谢谢。

serversocket reactor spring-webflux rsocket rsocket-java

5
推荐指数
1
解决办法
2112
查看次数

从 RSocket-Java 客户端连接到 Spring Boot RSocket 服务器时出错

我在通过 TCP 连接到 Spring Boot RSocket 应用程序时遇到问题。使用 RSocketRequester 时的客户端工作正常,但是当我尝试使用 RSocketFactory 客户端连接时,它不断出错。代码如下。

        RSocket rSocket = this.client = RSocketFactory
            .connect()
            .mimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.toString(), MediaType.APPLICATION_JSON_VALUE)
            .frameDecoder(PayloadDecoder.ZERO_COPY)
            .transport(TcpClientTransport.create("localhost", 7000))
            .start()
            .block();


Flux<Payload> s = rSocket.requestStream(DefaultPayload.create("1234", "socket"));
    s.subscribe();
Run Code Online (Sandbox Code Playgroud)

这给出了如下错误:

java.lang.IndexOutOfBoundsException: readerIndex(1) + length(115) exceeds writerIndex(6): AbstractPooledDerivedByteBuf$PooledNonRetainedSlicedByteBuf(ridx: 1, widx: 6, cap: 6/6, unwrapped: PooledUnsafeDirectByteBuf(ridx: 27, widx: 27, cap: 1024))
at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1477)
at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1463)
at io.netty.buffer.AbstractByteBuf.readSlice(AbstractByteBuf.java:880)
at io.rsocket.metadata.TaggingMetadata$1.next(TaggingMetadata.java:47)
at io.rsocket.metadata.TaggingMetadata$1.next(TaggingMetadata.java:37)
at org.springframework.messaging.rsocket.DefaultMetadataExtractor.extractEntry(DefaultMetadataExtractor.java:136)
at org.springframework.messaging.rsocket.DefaultMetadataExtractor.extract(DefaultMetadataExtractor.java:119)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.createHeaders(MessagingRSocket.java:195)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.handleAndReply(MessagingRSocket.java:167)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.requestStream(MessagingRSocket.java:127)
at io.rsocket.RSocketResponder.requestStream(RSocketResponder.java:207)
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:310)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
at …
Run Code Online (Sandbox Code Playgroud)

spring-boot spring-messaging rsocket rsocket-java

5
推荐指数
1
解决办法
690
查看次数

如何在 Spring Boot 中使用 rsocket 配置 SSL/TLS?

我已阅读stackoverflow 中的相关问题。但是,答案仅告诉如何配置身份验证检查。我需要在 Spring Boot 中使用 rsocket 加密所有传输的数据。如何在带有 SSL/TLS 的 Spring Boot 中使用 tls。当我初始化 rsocket 客户端时,我找不到任何支持的方法,如下所示,尽管我知道 rsocket 本身可以支持 SSL/TLS。我找到了一些例子:例子

        this.rsocketRequester = rsocketRequesterBuilder.setupRoute("sidecar-client")
            .setupData("test_data")
            .rsocketConnector(connector -> connector.acceptor(responder))
            .connectTcp("localhost", 7000)
            .block();

Run Code Online (Sandbox Code Playgroud)
Summary of my question:
1. Does rsocket in springboot support SSL/TLS ?
2. If spring boot support rsocket with SSL/TLS, are there any examples can be referenced
Run Code Online (Sandbox Code Playgroud)

spring-boot rsocket rsocket-java

5
推荐指数
1
解决办法
2412
查看次数

正确使用 LoadbalanceRSocketClient 和 Spring 的 RSocketRequester

我试图了解LoadbalanceRSocketClient SpringBoot 应用程序 ( RSocketRequester)上下文中的正确配置和使用模式。

我有两个 RSocket 服务器后端(SpringBoot、RSocket 消息传递)RSocketRequester在客户端上运行和配置,如下所示:

List<LoadbalanceTarget> servers = new ArrayList<>();
for (String url: backendUrls) {
  HttpClient httpClient = HttpClient.create()
    .baseUrl(url)
    .secure(ssl -> 
       ssl.sslContext(SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)));
  servers.add(LoadbalanceTarget.from(url, WebsocketClientTransport.create(httpClient, url)));
}

// RSocketRequester.Builder is autowired by Spring boot
RSocketRequester requester = builder
  .setupRoute("/connect")
  .setupData("test")
  //.rsocketConnector(connector -> connector.reconnect(Retry.fixedDelay(60, Duration.ofSeconds(1))))
 .transports(Flux.just(servers), new RoundRobinLoadbalanceStrategy());   
Run Code Online (Sandbox Code Playgroud)

配置完成后,请求者将在计时器循环中重复使用,如下所示:

@Scheduled(fixedDelay = 10000, initialDelay = 1000)
public void timer() {
  requester.route("/foo").data(Data).send().block();
}
Run Code Online (Sandbox Code Playgroud)

它工作 - 客户端启动,连接到其中一台服务器并将消息推送到它。如果我终止客户端连接的服务器,客户端会在下一个计时器事件中重新连接到另一台服务器。如果我再次启动第一个服务器并杀死第二个服务器,客户端将不再连接,并且在客户端观察到以下异常:

java.util.concurrent.CancellationException: Pool is exhausted
    at io.rsocket.loadbalance.RSocketPool.select(RSocketPool.java:202) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.loadbalance.LoadbalanceRSocketClient.lambda$fireAndForget$0(LoadbalanceRSocketClient.java:49) …
Run Code Online (Sandbox Code Playgroud)

spring-messaging rsocket rsocket-java

3
推荐指数
1
解决办法
596
查看次数