我正在开发一个用于 IoT 实时数据可视化的 Spring Boot WebFlux 应用程序。
我有一个Flux模拟来自设备的数据,我希望在建立每个事件的 websocket 连接时:
WebSocketHandler)RestTemplate)从我的日志来看,两个订阅者(websocket 处理程序和通知程序)似乎获得了两个具有完全不同值的不同流(在日志下方)。
我还尝试了一种在类share中链接方法的变体,在这种情况下,虽然我只有一个 Flux,但只有一个线程,所以一切都被阻塞(我可以看到 REST 调用阻止通过 websocket 发送) 。mapMySource
这里会发生什么?如何使两个订阅者在不同的执行上下文(不同的线程)中运行,从而完全相互独立?
下面是相关代码片段和日志。
谢谢大家!
更新:为了清楚起见,我必须指定s 具有随机生成的值,因此我通过使用保证具有相同值的/MyEvent解决了 @NikolaB 的答案,但我仍然希望有单独的执行上下文两个订阅者。ConnectableFluxshareFlux
public class MyWebSocketHandler implements WebSocketHandler {
@Autowired
public MySource mySource;
@Autowired
public Notifier notifier;
public Mono<Void> handle(WebSocketSession webSocketSession) {
Flux<MyEvent> events = mySource.events();
events.subscribe(event -> notifier.sendNotification(event));
return webSocketSession.send(events.map(this::toJson).map(webSocketSession::textMessage));
}
private …Run Code Online (Sandbox Code Playgroud) java resttemplate spring-websocket project-reactor spring-webflux
这是我在这里发表的第一篇文章,因此如果我不遵守格式规则或其他一般规则,我提前道歉。
我有一个基于 Spring JMS 的客户端应用程序,它使用多个使用者从队列中读取数据。我的限制是:在失败和消息重复检测的情况下保证重新传递。
弹簧配置
<bean id="jndiDestinationResolver" class="org.springframework.jms.support.destination.DynamicDestinationResolver" />
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="MyConnectionFactory" />
<property name="destinationResolver" ref="jndiDestinationResolver" />
<property name="receiveTimeout" value="100" />
</bean>
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="destinationResolver" ref="jndiDestinationResolver" />
<property name="connectionFactory" ref="MyConnectionFactory" />
<property name="destinationName" value="my_Queue" />
<property name="messageListener" ref="MyListener" />
<property name="maxConcurrentConsumers" value="10"/>
<property name="sessionTransacted" value="true"/>
</bean>
Run Code Online (Sandbox Code Playgroud)
我的监听器代码类似于:
protected void processTextMessage(TextMessage textMessage) {
尝试 {
// 处理消息
} catch(异常e){
尝试 {
sendTextMessage("my_Queue",correlationID, textMessage.getText());
} catch (JMSException e1) {
log.error("将消息写回队列时出错!");
抛出 JmsUtils.convertJmsAccessException(e1);
}
}
}
protected void sendTextMessage(字符串队列名称,最终字节[]相关ID,字符串消息){ …