How do I use convertAndSendToUser() with an external broker such as RabbitMQ in Spring4?

Pau*_*ond 8 java spring spring-boot spring-websocket

I am trying to configure web socket support in Spring 4 using RabbitMQ as the external broker, but as soon as I switch to RabbitMQ, I get the following error on start-up in the client:

'/user/queue/changes' is not a valid destination.
Valid destination types are: /temp-queue, /exchange, /topic, /queue, /amq/queue, /reply-queue/.
Run Code Online (Sandbox Code Playgroud)

On the server I am using convertAndSendToUser and this works fine with the simple broker, but as soon as I switch to RabbitMQ, I get this error. Note that RabbitMQ works fine for normal topic broadcasts - it's just the /user channel that falls over.

我需要做一些特别的事情/user才能使用 RabbitMQ 吗?

编辑以包含 Web 套接字配置

WebSocketConfig是非常标准的,有一些定制可以将它与spring-session集成:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractSessionWebSocketMessageBrokerConfigurer<ExpiringSession> {


@Override
public void configureStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/changes").withSockJS();
}

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
    //config.enableSimpleBroker("/queue", "/topic");
    StompBrokerRelayRegistration r = config.enableStompBrokerRelay("/user", "/topic");

    try {
        String rabbitUrl = System.getenv("CLOUDAMQP_URL");
        if(rabbitUrl != null) { 
            log.info("RABBIT URL detected: " + rabbitUrl);
            URI uri = new URI(rabbitUrl);
            String host = uri.getHost();                
            String login = uri.getUserInfo().split(":",2)[0];
            String passCode = uri.getUserInfo().split(":",2)[1];
            String vhost = uri.getPath().substring(1);
            r.setRelayHost(host);
            r.setSystemLogin(login);    
            r.setSystemPasscode(passCode);
            r.setClientLogin(login);    
            r.setClientPasscode(passCode);
            r.setVirtualHost(vhost);                
        }
    } catch(Exception e) {
        log.error("Error setting up RabbitMQ", e);
    }
    config.setApplicationDestinationPrefixes("/app");
  }
}
Run Code Online (Sandbox Code Playgroud)

Kan*_*ane 2

我遇到了类似的错误,但在网上搜索后没有找到直观的解决方案。

我想在阅读RabbitMQ STOMP文档后分享我的发现。

根据RabbitMQ STOMP的文档,只允许以/exchange, /queue, /amq/queue,/topic和开头的目的地。temp-queue目的地/user/*将不被允许。您可以根据消息的要求选择目的地。

/exchange -- 发送到任意路由键并订阅任意绑定模式;

/queue -- 发送和订阅到 STOMP 网关管理的队列;

/amq/queue -- 发送和订阅到 STOMP 网关外部创建的队列;

/topic -- 发送和订阅临时和持久的主题;

/temp-queue/ -- 创建临时队列(仅在回复标头中)。

例如,我想向某个主题发送一条消息来通知所有订阅者。

对于将每条消息的副本传递给所有活动订阅者的简单主题目标,可以使用/topic/< name >形式的目标。主题目的地支持 AMQP 主题交换的所有路由模式。

发送到没有活动订阅者的主题目标的消息将被简单地丢弃。

AMQP 0-9-1 语义

对于 SEND 帧,消息使用路由键 < name >发送到amq.topic交换。

对于 SUBSCRIBE 帧,会创建一个自动删除的非持久队列,并使用路由键 < name > 绑定到amq.topic交换。针对队列创建订阅。

该规范意味着带有目标的stomp子消息/topic/<name>将使用rabbitmq的默认交换amp.topic ,然后如果变量名不存在,则可能会使用该变量名称创建绑定,并且还会创建一个队列来绑定交换amp.topic那个绑定。

如果要创建持久订阅,客户端应发送带有以下标头的订阅消息。

durable:true
auto-delete:false 
Run Code Online (Sandbox Code Playgroud)

在我的应用程序中,websocket 服务器接收消息/watch/{liveid},然后回复另一条消息到 topic /topic/watchinfo-{liveid}

@Secured(User.ROLE_USER)
@MessageMapping("/watch/{liveid}")
@SendTo("/topic/watchinfo-{liveid}")
@JsonView(View.Live.class)
public LiveWatchInfoMessage liveinfo(@DestinationVariable("liveid") String liveid,
                                         @AuthenticationPrincipal UserDetails activeUser) {
        ...
        return LiveWatchInfoMessage.builder().build();
    }
Run Code Online (Sandbox Code Playgroud)