我有一个关于 Springs RSocketRequester 的问题。我有一个 rsocket 服务器和客户端。客户端连接到此服务器并请求 @MessageMapping 端点。它按预期工作。
但是如果我重新启动服务器怎么办。如何从客户端自动重新连接到 rsocket 服务器?谢谢
服务器:
@Controller
class RSC {
@MessageMapping("pong")
public Mono<String> pong(String m) {
return Mono.just("PONG " + m);
}
}
Run Code Online (Sandbox Code Playgroud)
客户:
@Bean
public RSocketRequester rSocketRequester() {
return RSocketRequester
.builder()
.connectTcp("localhost", 7000)
.block();
}
@RestController
class RST {
@Autowired
private RSocketRequester requester;
@GetMapping(path = "/ping")
public Mono<String> ping(){
return this.requester
.route("pong")
.data("TEST")
.retrieveMono(String.class)
.doOnNext(System.out::println);
}
}
Run Code Online (Sandbox Code Playgroud) 我最近遇到过"反应套接字"一词.到目前为止,我曾经认为websockets是完全成熟的异步风格的方法.
什么是无功插座.
这个链接(http://reactivesocket.io/)甚至讨论了websockets的比较.
假设我有一个 Kubernetes 集群,我在其中部署了使用 RSocket 进行通信的 Spring Boot 应用程序。为了相互调用,他们将使用 Kubernetes 服务名称,因此我们将依赖该“注册表”进行发现和路由。
在另一方面,Netify提供Netifi经纪人可以在Kubernetes部署。如果我理解得很好,这个代理是为了调解应用程序之间的通信,所以那些 Spring Boot RSocket 应用程序不会通过它们的 Kubernetes 服务名称进行通信,而是通过 Netifi 代理。
每种方法的优缺点是什么?
我正在尝试获取连接到 RSocket+SpringBoot Web 服务器的浏览器的远程 IP。连接是基于 WebSocket 的 RSocket。
Web 服务器是 Java-8、SpringBoot-2,使用基于 WebSocket 的 RSocket 并向浏览器发送 RequestStream。我利用 SpringBoot 自动配置进行 RSocket 设置,因此服务器端的代码非常少 - 见下文。
@Headers and MessageHeader下面的代码只是为了看看他们是否有任何可能导致远程IP的东西,没有其他原因他们在那里。
我在网络上搜索寻找答案 - 很多用于http,一些用于websockets,零用于RSocket。这 - https://github.com/rsocket/rsocket-java/issues/735 - 看起来很有希望,但无法获取 DuplexConnection 的句柄,所以那里没有雪茄。
有任何想法吗 ?谢谢 !
应用程序.yml:
spring.rsocket.server:
mapping-path: /rsocket-test
transport: websocket
server.port: 8080
Run Code Online (Sandbox Code Playgroud)
测试控制器.java
/**
* TODO: get the remote IP address and log.
* Options:
* 1. @see <a href="https://github.com/rsocket/rsocket-java/issues/735">Ability to intercept requests and access channel information such as remote address</a>.
* 2. Inject …Run Code Online (Sandbox Code Playgroud) RSocket 提供了 4 种交互模型。
Spring(和 Spring Boot)提供了 RSocket 集成,使用现有的消息传递基础设施很容易构建一个 RSocket 服务器来隐藏原始的 RSocket API。
@MessageMapping("hello")
public Mono<Void> hello(Greeting p) {
log.info("received: {} at {}", p, Instant.now());
return Mono.empty();
}
@MessageMapping("greet.{name}")
public Mono<String> greet(@DestinationVariable String name, @Payload Greeting p) {
log.info("received: {}, {} at {}", name, p, Instant.now());
return Mono.just("Hello " + name + ", " + p.getMessage() + " at " + Instant.now());
}
@MessageMapping("greet-stream")
public Flux<String> greetStream(@Payload Greeting p) {
log.info("received: {} at {}", p, …Run Code Online (Sandbox Code Playgroud) 我已经使用 RSocket 完成了整个 API 后端,我想知道如何记录我的 RSocket API ?有 OpenAPI for HTTP 之类的工具吗?
我在通过 TCP 连接到 Spring Boot RSocket 应用程序时遇到问题。使用 RSocketRequester 时的客户端工作正常,但是当我尝试使用 RSocketFactory 客户端连接时,它不断出错。代码如下。
RSocket rSocket = this.client = RSocketFactory
.connect()
.mimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.toString(), MediaType.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.block();
Flux<Payload> s = rSocket.requestStream(DefaultPayload.create("1234", "socket"));
s.subscribe();
Run Code Online (Sandbox Code Playgroud)
这给出了如下错误:
java.lang.IndexOutOfBoundsException: readerIndex(1) + length(115) exceeds writerIndex(6): AbstractPooledDerivedByteBuf$PooledNonRetainedSlicedByteBuf(ridx: 1, widx: 6, cap: 6/6, unwrapped: PooledUnsafeDirectByteBuf(ridx: 27, widx: 27, cap: 1024))
at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1477)
at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1463)
at io.netty.buffer.AbstractByteBuf.readSlice(AbstractByteBuf.java:880)
at io.rsocket.metadata.TaggingMetadata$1.next(TaggingMetadata.java:47)
at io.rsocket.metadata.TaggingMetadata$1.next(TaggingMetadata.java:37)
at org.springframework.messaging.rsocket.DefaultMetadataExtractor.extractEntry(DefaultMetadataExtractor.java:136)
at org.springframework.messaging.rsocket.DefaultMetadataExtractor.extract(DefaultMetadataExtractor.java:119)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.createHeaders(MessagingRSocket.java:195)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.handleAndReply(MessagingRSocket.java:167)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.requestStream(MessagingRSocket.java:127)
at io.rsocket.RSocketResponder.requestStream(RSocketResponder.java:207)
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:310)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
at …Run Code Online (Sandbox Code Playgroud) 我已阅读stackoverflow 中的相关问题。但是,答案仅告诉如何配置身份验证检查。我需要在 Spring Boot 中使用 rsocket 加密所有传输的数据。如何在带有 SSL/TLS 的 Spring Boot 中使用 tls。当我初始化 rsocket 客户端时,我找不到任何支持的方法,如下所示,尽管我知道 rsocket 本身可以支持 SSL/TLS。我找到了一些例子:例子
this.rsocketRequester = rsocketRequesterBuilder.setupRoute("sidecar-client")
.setupData("test_data")
.rsocketConnector(connector -> connector.acceptor(responder))
.connectTcp("localhost", 7000)
.block();
Run Code Online (Sandbox Code Playgroud)
Summary of my question:
1. Does rsocket in springboot support SSL/TLS ?
2. If spring boot support rsocket with SSL/TLS, are there any examples can be referenced
Run Code Online (Sandbox Code Playgroud) 我有一个问题要问 Spring Cloud 的人。
大约。一年前,有关于 Spring Cloud RSocket 的精彩演示。其中一部分包括 Spring Cloud Gateway 中的 RSocket 支持。rsocket 经纪人。
现在我看到https://github.com/spring-cloud-incubator/spring-cloud-rsocket已经存档了。
我想知道:这是否意味着 Spring Cloud RSocket 已经死了?有替代品吗?Spring Cloud Gateway 中是否还有其他对 RSocket 的支持?或者Spring Cloud RSocket只是处于休眠状态并且有希望复活?
PS:我知道 Spring Boot 中的 RSocket 支持,这里有很好的描述。我感兴趣的是 Spring Cloud RSocket 功能,但我没有找到 Spring Boot RSocket 支持。
我正在尝试使用 Spring 云流将数据从 kafka 发送到 Rsocket,然后在 React 上表示数据
这是我的配置。
@Configuration
public class RsocketConsumerConfiguration {
@Bean
public Sinks.Many<Data> sender(){
return Sinks.many().multicast().directBestEffort();
}
}
Run Code Online (Sandbox Code Playgroud)
@Controller公共类ServerController {
@Autowired
private Sinks.Many<Data> integer;
@MessageMapping("integer")
public Flux<Data> integer() {
return integer.asFlux();
}
Run Code Online (Sandbox Code Playgroud)
@EnableBinding(IClientProcessor.class)
public class Listener {
@Autowired
private Sinks.Many<Data> integer;
@StreamListener(IClientProcessor.INTEGER)
public void integer(Data val) {
System.out.println(val);
integer.tryEmitNext(val);
}
}
Run Code Online (Sandbox Code Playgroud)
let client = new RSocketClient({
transport: new RSocketWebSocketClient(
{
url: 'ws://localhost:7000/ws',
wsCreator: (url) => new WebSocket(url),
debug: true,
},
BufferEncoders,
),
setup: {
dataMimeType: "application/json", …Run Code Online (Sandbox Code Playgroud) rsocket ×10
spring-boot ×7
java ×2
rsocket-java ×2
spring ×2
api ×1
netifi ×1
reactive ×1
reactjs ×1
reconnect ×1
spring-cloud ×1
websocket ×1