小编vor*_*lex的帖子

如何让 Flux 的多个订阅者在不同的执行上下文/线程上运行

我正在开发一个用于 IoT 实时数据可视化的 Spring Boot WebFlux 应用程序。

我有一个Flux模拟来自设备的数据,我希望在建立每个事件的 websocket 连接时:

  • 必须通过 websocket 发送以进行实时可视化(使用reactive WebSocketHandler
  • 必须根据给定条件进行检查,以便通过 HTTP REST 调用发送通知 ( RestTemplate)

从我的日志来看,两个订阅者(websocket 处理程序和通知程序)似乎获得了两个具有完全不同值的不同流(在日志下方)。

我还尝试了一种在类share中链接方法的变体,在这种情况下,虽然我只有一个 Flux,但只有一个线程,所以一切都被阻塞(我可以看到 REST 调用阻止通过 websocket 发送) 。mapMySource

这里会发生什么?如何使两个订阅者在不同的执行上下文(不同的线程)中运行,从而完全相互独立?

下面是相关代码片段和日志。

谢谢大家!

更新:为了清楚起见,我必须指定s 具有随机生成的值,因此我通过使用保证具有相同值的/MyEvent解决了 @NikolaB 的答案,但我仍然希望有单独的执行上下文两个订阅者。ConnectableFluxshareFlux

public class MyWebSocketHandler implements WebSocketHandler {

   @Autowired
   public MySource mySource;

   @Autowired
   public Notifier notifier;

   public Mono<Void> handle(WebSocketSession webSocketSession) {
            Flux<MyEvent> events = mySource.events();
            events.subscribe(event -> notifier.sendNotification(event));
            return webSocketSession.send(events.map(this::toJson).map(webSocketSession::textMessage));
   }

   private …
Run Code Online (Sandbox Code Playgroud)

java resttemplate spring-websocket project-reactor spring-webflux

6
推荐指数
1
解决办法
1万
查看次数

Spring JMS - 通过将消息发送回同一队列而不是回滚事务来重新传递

这是我在这里发表的第一篇文章,因此如果我不遵守格式规则或其他一般规则,我提前道歉。

我有一个基于 Spring JMS 的客户端应用程序,它使用多个使用者从队列中读取数据。我的限制是:在失败和消息重复检测的情况下保证重新传递。

弹簧配置

<bean id="jndiDestinationResolver" class="org.springframework.jms.support.destination.DynamicDestinationResolver" />


    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="MyConnectionFactory" />
        <property name="destinationResolver" ref="jndiDestinationResolver" />
        <property name="receiveTimeout" value="100" />
    </bean>


    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="destinationResolver" ref="jndiDestinationResolver" />
            <property name="connectionFactory" ref="MyConnectionFactory" />
            <property name="destinationName" value="my_Queue" />
            <property name="messageListener" ref="MyListener" />
            <property name="maxConcurrentConsumers" value="10"/>
            <property name="sessionTransacted" value="true"/>
    </bean>
Run Code Online (Sandbox Code Playgroud)

我的监听器代码类似于:

    protected void processTextMessage(TextMessage textMessage) {
        尝试 {
            // 处理消息
        } catch(异常e){
            尝试 {
                sendTextMessage("my_Queue",correlationID, textMessage.getText());
            } catch (JMSException e1) {
                log.error("将消息写回队列时出错!");
                抛出 JmsUtils.convertJmsAccessException(e1);

            }
        }
    }


    protected void sendTextMessage(字符串队列名称,最终字节[]相关ID,字符串消息){ …

queue spring transactions rollback spring-jms

5
推荐指数
1
解决办法
1万
查看次数