meu*_*det 5 spring reactjs spring-boot spring-cloud-stream rsocket
我正在尝试使用 Spring 云流将数据从 kafka 发送到 Rsocket,然后在 React 上表示数据
这是我的配置。
@Configuration
public class RsocketConsumerConfiguration {
@Bean
public Sinks.Many<Data> sender(){
return Sinks.many().multicast().directBestEffort();
}
}
Run Code Online (Sandbox Code Playgroud)
@Controller公共类ServerController {
@Autowired
private Sinks.Many<Data> integer;
@MessageMapping("integer")
public Flux<Data> integer() {
return integer.asFlux();
}
Run Code Online (Sandbox Code Playgroud)
@EnableBinding(IClientProcessor.class)
public class Listener {
@Autowired
private Sinks.Many<Data> integer;
@StreamListener(IClientProcessor.INTEGER)
public void integer(Data val) {
System.out.println(val);
integer.tryEmitNext(val);
}
}
Run Code Online (Sandbox Code Playgroud)
let client = new RSocketClient({
transport: new RSocketWebSocketClient(
{
url: 'ws://localhost:7000/ws',
wsCreator: (url) => new WebSocket(url),
debug: true,
},
BufferEncoders,
),
setup: {
dataMimeType: "application/json",
metadataMimeType: MESSAGE_RSOCKET_COMPOSITE_METADATA.string,
keepAlive: 5000,
lifetime: 60000,
},
});
client
.then(rsocket => {
console.log("Connected to rsocket");
rsocket.requestStream({
metadata: Buffer.from(encodeCompositeMetadata([
[MESSAGE_RSOCKET_ROUTING, encodeRoute("integer")],
])),
})
.subscribe({
onSubscribe: s => {
s.request(2147483647)
},
onNext: (p) => {
let newData = {
time: new Date(JSON.parse(p.data).time).getUTCSeconds(),
integer: JSON.parse(p.data).integer
}
newData.integer >100?setInteger(currentData => [newData, ...currentData]):setInt(currentData => [newData, ...currentData])
console.log(newData)
},
onError: (e) => console.error(e),
onComplete: () => console.log("Done")
});
Run Code Online (Sandbox Code Playgroud)
spring.cloud.stream.bindings.integer.destination=integer 无法在反应应用程序中看到它。请指教。我做错了什么?
鉴于数据似乎直接从 Kafka(通过 Spring)发送到客户端,也许通过互联网消息传递代理将 Kafka 消息通过 WebSockets 流式传输到面向互联网的客户端会更有意义。
披露:我不是该文章的作者,但在作者工作的公司工作。我们经常看到这种用例,因此预计这种方法可能有用。
| 归档时间: |
|
| 查看次数: |
616 次 |
| 最近记录: |