Spring @SubscribeMapping是否真的为客户订阅某个主题?

Mer*_*tce 35 spring stomp messagebroker spring-messaging spring-websocket

我正在使用Spring Websocket和STOMP,Simple Message Broker.在我的@Controller使用方法级别@SubscribeMapping,它应该将客户端订阅到主题,以便客户端之后将接收该主题的消息.假设客户订阅主题"聊天":

stompClient.subscribe('/app/chat', ...);

当客户订阅"/ app/chat "而不是"/ topic/chat"时,此订阅将转到使用以下方法映射的方法@SubscribeMapping:

@SubscribeMapping("/chat")
public List getChatInit() {
    return Chat.getUsers();
}
Run Code Online (Sandbox Code Playgroud)

这是Spring ref.说:

默认情况下,@ SubsscribeMapping方法的返回值作为消息直接发送回连接的客户端,并且不通过代理.这对于实现请求 - 回复消息交互很有用; 例如,在初始化应用程序UI时获取应用程序数据.

好的,这就是我想要的,但只是部分 !! 订阅后发送一些init-data,好吧.但订阅呢?在我看来,这里发生的事情只是一个请求 - 回复,就像一个服务.订阅只是消费.如果是这种情况,请澄清我.

  • 如果经纪人没有参与此活动,客户是否订阅了某些地方?
  • 如果以后我想向"聊天"下标者发送一些消息,客户会收到它吗?它似乎不是这样.
  • 谁真正实现了订阅?经纪人?或者其他人?

如果在这里客户端没有订阅任何地方,我想知道为什么我们称之为"订阅"; 因为客户端只收到一条消息而不是后续消息.

编辑:

为了确保订阅已经实现,我尝试的内容如下:

服务器端:

组态:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/hello").withSockJS();
    }
}
Run Code Online (Sandbox Code Playgroud)

控制器:

@Controller
public class GreetingController {

    @MessageMapping("/hello")
    @SendTo("/topic/greetings")
    public Greeting greeting(HelloMessage message) throws Exception {
        System.out.println("inside greeting");
        return new Greeting("Hello, " + message.getName() + "!");
    }

    @SubscribeMapping("/topic/greetings")
    public Greeting try1() {
        System.out.println("inside TRY 1");
        return new Greeting("Hello, " + "TRY 1" + "!");
    }
}
Run Code Online (Sandbox Code Playgroud)

客户端:

...
    stompClient.subscribe('/topic/greetings', function(greeting){
                        console.log('RECEIVED !!!');
                    });
    stompClient.send("/app/hello", {}, JSON.stringify({ 'name': name }));
...
Run Code Online (Sandbox Code Playgroud)

我想发生什么:

  1. 当客户端订阅' /topic/greetings'时,该方法try1被执行.
  2. 当客户端将msg发送给' /app/hello'时,它应该收到问候语msg,这将是@SendTo' /topic/greetings'.

结果:

  1. 如果客户端订阅/topic/greetings,则该方法try1无法捕获它.

  2. 当客户端将msg发送给' /app/hello'时,greeting方法被执行,客户端收到了问候消息.所以我们理解它已经/topic/greetings正确地订阅了.

  3. 但请记住1.失败了.经过一番尝试后,客户端订阅时就可以了'/app/topic/greetings',即前缀为/app(这可以通过配置理解).

  4. 现在1.正在运行,但是这次2.失败:当客户端将msg发送到' /app/hello'时,是的,greeting方法已执行,但客户端没有收到问候消息.(因为可能现在客户端订阅了前缀为' /app' 的主题,这是不需要的.)

所以,我得到的是我想要的1或2,但不是这两个.

  • 如何使用此结构实现此目的(正确配置映射路径)?

Bri*_*zel 18

默认情况下,@ SubsscribeMapping方法的返回值作为消息直接发送回连接的客户端,并且不通过代理.

(强调我的)

这里的Spring Framework文档描述了响应消息发生了什么,而不是传入SUBSCRIBE消息.

那么回答你的问题:

  • 是的,客户端订阅了该主题
  • 是的,如果您使用该主题发送,则订阅该主题的客户将收到一条消息
  • 消息代理负责管理订阅

更多关于订阅管理

使用SimpleMessageBroker,消息代理实现存在于您的应用程序实例中.订阅注册由管理DefaultSubscriptionRegistry.接收消息时,SimpleBrokerMessageHandler处理SUBSCRIPTION消息和注册订阅(请参阅此处的实现).

使用像RabbitMQ这样的"真实"消息代理,您已经配置了一个将消息转发给代理的Stomp代理中继.在这种情况下,SUBSCRIBE消息将转发给代理,负责管理订阅(请参阅此处的实现).

更新 - 有关STOMP消息流的更多信息

如果您查看有关STOMP消息流的参考文档,您将看到:

  • "/ topic/greeting"的订阅通过"clientInboundChannel"传递并转发给代理
  • 发送到"/ app/greeting"的问候语通过"clientInboundChannel"传递并转发给GreetingController.控制器添加当前时间,返回值通过"brokerChannel"作为消息传递给"/ topic/greeting"(目的地根据约定选择,但可以通过@SendTo覆盖).

所以这里/topic/hello是经纪人的目的地; 发送的消息直接转发给代理.虽然/app/hello是应用程序目的地,但应该生成要发送的消息/topic/hello,除非@SendTo另有说明.

现在,您更新的问题在某种程度上是不同的,如果没有更精确的用例,很难说哪种模式最适合解决这个问题.以下是一些:

  • 您希望客户端在发生异常时发现异常:SUBSCRIBE到特定主题 /topic/hello
  • 您想要广播消息:向特定主题发送消息 /topic/hello
  • 您想获得某些内容的即时反馈,例如初始化您的应用程序的状态:SUBSCRIBE到一个应用程序目标/app/hello,控制器立即响应消息
  • 你想一个或多个消息发送到任何应用目的地/app/hello:使用的组合@MessageMapping,@SendTo或消息模板.

如果你想要一个很好的例子,那么请查看这个聊天应用程序,演示Spring websocket功能的日志以及真实世界的用例.

  • 直观地,人们可能期望除了返回控制器动作的值之外,还将自动创建订阅到`/ topic/chat.participants`,类似于`MessageMapping`转发到相应的代理目的地.但我没有看到这种情况发生,AFAICT,客户必须单独订阅`/ topic/chat.participants`. (2认同)

Pau*_*uli 16

所以有两个:

  • 使用主题来处理订阅
  • 在该主题上使用@SubscribeMapping来提供连接响应

不像你经历的那样(和我一样).

解决你的情况的方法(就像我做的那样)是:

  1. 删除@SubscribeMapping - 它只适用于/ app前缀
  2. 就像你自然一样订阅/ topic(没有/ app前缀)
  3. 实现ApplicationListener

    1. 如果要直接回复单个客户端,请使用用户目标(请参阅websocket-stomp-user-destination 或者您也可以订阅子路径,例如/ topic/my-id-42,然后您可以向此发送消息subtopic(我不知道你的确切用例,我的是我有专门的订阅,如果我想做广播,我会迭代它们)

    2. 收到StompCommand.SUBSCRIBE后,立即在ApplicationListener的onApplicationEvent方法中发送消息

订阅事件处理程序:

@Override
  public void onApplicationEvent(SessionSubscribeEvent event) {
      Message<byte[]> message = event.getMessage();
      StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
      StompCommand command = accessor.getCommand();
      if (command.equals(StompCommand.SUBSCRIBE)) {
          String sessionId = accessor.getSessionId();
          String stompSubscriptionId = accessor.getSubscriptionId();
          String destination = accessor.getDestination();
          // Handle subscription event here
          // e.g. send welcome message to *destination*
       }
  }
Run Code Online (Sandbox Code Playgroud)

  • 感谢您的帮助,特别是一段时间后.您的答案对于稍后阅读的人有用.所以+1.然而,在我的情况下,正如我记得的那样,onApplicationEvent在实际成功订阅之前运行,并且返回对subscriptor的回答,有人担心下标,即回答好像他已成功订阅,但这还不是真的.我看到默认的Spring Stomp Broker有一些重要的限制,一旦我开发了足够多的项目,我决定使用另一个"真正的"经纪人. (2认同)

mzo*_*zoz 9

嗨,Mert,虽然你的问题是 4 年前提出的,但我仍然会尝试回答它,因为我最近在同一问题上摸不着头脑并最终解决了它。

这里的关键部分是@SubscribeMapping一次性请求-响应交换,因此try1()控制器中的方法只会在客户端代码运行后被触发一次

stompClient.subscribe('/topic/greetings', callback)
Run Code Online (Sandbox Code Playgroud)

之后就没有办法触发try1()stompClient.send(...)

这里的另一个问题是控制器是应用程序消息处理程序的一部分,它接收带有前缀的目的地/app,因此为了到达@SubscribeMapping("/topic/greetings")您实际上必须编写这样的客户端代码

stompClient.subscribe('/app/topic/greetings', callback)
Run Code Online (Sandbox Code Playgroud)

由于通常topic与经纪人进行映射以避免歧义,因此我建议将您的代码修改为

@SubscribeMapping("/greetings")

stompClient.subscribe('/app/greetings', callback)
Run Code Online (Sandbox Code Playgroud)

现在console.log('RECEIVED !!!')应该可以了。

官方文档@SubscribeMapping还推荐了初始 UI 渲染的用例场景。

这什么时候有用?假设代理映射到/topic和/queue,而应用程序控制器映射到/app。在此设置中,代理存储对 /topic 和 /queue 的所有订阅,这些订阅旨在重复广播,并且不需要应用程序参与。客户端还可以订阅某个 /app 目的地,控制器可以返回一个值来响应该订阅,而无需代理参与,而无需再次存储或使用订阅(实际上是一次性请求-答复交换)。一个用例是在启动时使用初始数据填充 UI。


Sta*_*tov 5

我遇到了同样的问题,当我同时在客户端/topic/app客户端上订阅时,最终切换到解决方案,将在/topic处理程序上接收到的所有内容缓冲,直到/app-bound一个将下载所有聊天记录,即@SubscribeMapping返回。然后,我将所有最近的聊天条目与在上收到的聊天条目合并/topic-就我而言,可能有重复项。

另一种工作方法是宣布

registry.enableSimpleBroker("/app", "/topic");
registry.setApplicationDestinationPrefixes("/app", "/topic");
Run Code Online (Sandbox Code Playgroud)

显然,并不完美。但工作:)