在tomcat集群中的Spring Websocket

Rob*_*ese 21 spring spring-mvc spring-websocket

在我们当前的应用程序中,我们使用Spring Websockets而不是STOMP.我们希望横向扩展.是否有关于如何处理多个tomcat实例上的websocket流量的最佳实践以及如何在多个节点上维护会话信息.是否有一个可以参考的工作样本?

Tha*_*Van 18

您的要求可分为2个子任务:

  1. 维护多个节点的会话信息:您可以尝试使用Redis支持的Spring Sessions群集(请参阅:使用Redis的HttpSession).这非常简单并且已经支持Spring Websockets(参见:Spring Session和WebSockets).

  2. 处理多个tomcat实例上的websockets流量:有几种方法可以做到这一点.

    • 第一种方式:使用功能齐全的代理(例如:ActiveMQ)并尝试新功能支持多个WebSocket服务器(来自:4.2.0 RC1)
    • 第二种方式:使用全功能代理并实现分布式UserSessionRegistry(例如:Using Redis:D).DefaultUserSessionRegistry使用内存存储的默认实现.

更新:我使用Redis编写了一个简单的实现,如果您感兴趣,请尝试使用它

要配置功能齐全的代理(代理中继),您可以尝试:

public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    ...

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic", "/queue")
            .setRelayHost("localhost") // broker host
            .setRelayPort(61613) // broker port
            ;
        config.setApplicationDestinationPrefixes("/app");
    }

    @Bean
    public UserSessionRegistry userSessionRegistry() {
        return new RedisUserSessionRegistry(redisConnectionFactory);
    }

    ...
}
Run Code Online (Sandbox Code Playgroud)

import java.util.Set;

import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.BoundHashOperations;
import org.springframework.data.redis.core.BoundSetOperations;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.messaging.simp.user.UserSessionRegistry;
import org.springframework.util.Assert;

/**
 * An implementation of {@link UserSessionRegistry} backed by Redis.
 * @author thanh
 */
public class RedisUserSessionRegistry implements UserSessionRegistry {

    /**
     * The prefix for each key of the Redis Set representing a user's sessions. The suffix is the unique user id.
     */
    static final String BOUNDED_HASH_KEY_PREFIX = "spring:websockets:users:";

    private final RedisOperations<String, String> sessionRedisOperations;

    @SuppressWarnings("unchecked")
    public RedisUserSessionRegistry(RedisConnectionFactory redisConnectionFactory) {
        this(createDefaultTemplate(redisConnectionFactory));
    }

    public RedisUserSessionRegistry(RedisOperations<String, String> sessionRedisOperations) {
        Assert.notNull(sessionRedisOperations, "sessionRedisOperations cannot be null");
        this.sessionRedisOperations = sessionRedisOperations;
    }

    @Override
    public Set<String> getSessionIds(String user) {
        Set<String> entries = getSessionBoundHashOperations(user).members();
        return (entries != null) ? entries : Collections.<String>emptySet();
    }

    @Override
    public void registerSessionId(String user, String sessionId) {
        getSessionBoundHashOperations(user).add(sessionId);
    }

    @Override
    public void unregisterSessionId(String user, String sessionId) {
        getSessionBoundHashOperations(user).remove(sessionId);
    }

    /**
     * Gets the {@link BoundHashOperations} to operate on a username
     */
    private BoundSetOperations<String, String> getSessionBoundHashOperations(String username) {
        String key = getKey(username);
        return this.sessionRedisOperations.boundSetOps(key);
    }

    /**
     * Gets the Hash key for this user by prefixing it appropriately.
     */
    static String getKey(String username) {
        return BOUNDED_HASH_KEY_PREFIX + username;
    }

    @SuppressWarnings("rawtypes")
    private static RedisTemplate createDefaultTemplate(RedisConnectionFactory connectionFactory) {
        Assert.notNull(connectionFactory, "connectionFactory cannot be null");
        StringRedisTemplate template = new StringRedisTemplate(connectionFactory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new StringRedisSerializer());
        template.afterPropertiesSet();
        return template;
    }

}
Run Code Online (Sandbox Code Playgroud)


Jor*_*ozi 18

水平扩展WebSockets实际上与水平扩展无状态/有状态HTTP的应用程序非常不同.

水平扩展无状态HTTP应用程序:只需在不同的计算机中启动一些应用程序实例,并将负载均衡器放在它们前面.有很多不同的负载均衡器解决方案,如HAProxy,Nginx等.如果您在AWS等云环境中,您也可以使用Elastic Load Balancer等托管解决方案.

水平扩展状态HTTP应用程序:如果我们可以让所有应用程序每次都无状态,那将是很棒的,但不幸的是,这并不总是可行的.因此,在处理有状态HTTP应用程序时,您必须关心HTTP会话,它基本上是每个不同客户端的本地存储,其中Web服务器可以存储跨不同HTTP请求保存的数据(例如处理购物时)大车).好吧,在这种情况下,当横向扩展时,您应该知道,正如我所说,它是一个LOCAL存储,因此ServerA将无法处理ServerB上的HTTP会话.换句话说,如果出于任何原因,ServerA服务的Client1突然开始由ServerB服务,他的HTTP会话将会丢失(他的购物车将会消失!).原因可能是节点故障甚至是部署.为了解决此问题,您不能仅在本地保留HTTP会话,也就是说,您必须将它们存储在另一个外部组件上.这是能够处理这个问题的几个组件,例如任何关系数据库,但这实际上是一个开销.一些NoSQL数据库可以很好地处理这种键值行为,例如Redis.现在,由于HTTP会话存储在Redis上,如果客户端开始由另一台服务器提供服务,它将从Redis获取客户端的HTTP会话并将其加载到其内存中,因此一切都将继续工作,用户不会丢失他的HTTP会话了.您可以使用Spring Session轻松地在Redis上存储HTTP会话.

水平扩展WebSocket应用程序:当建立WebSocket连接时,服务器必须保持与客户端的连接打开,以便它们可以在两个方向上交换数据.当客户端正在侦听诸如"/topic/public.messages"之类的目的地时,我们说客户端已订阅此目的地.在Spring中,当您使用该simpleBroker方法时,订阅将保留在内存中,因此,例如,如果Client1由ServerA提供服务并且想要使用WebSocket将消息发送到ServerB服务的Client2,会发生什么?你已经知道了答案!邮件将不会传递给Client2,因为Server1甚至不知道Client2的订阅.因此,为了解决此问题,您还必须外部化WebSockets订阅.当您使用STOMP作为子协议时,您需要一个可以充当外部STOMP代理的外部组件.有很多工具可以做到这一点,但我建议使用RabbitMQ.现在,您必须更改Spring配置,以便它不会将订阅保留在内存中.相反,它会将订阅委托给外部STOMP代理.您可以使用一些基本配置轻松实现此目的enableStompBrokerRelay.需要注意的重要一点是HTTP会话与WebSocket会话不同.使用Spring Session在Redis中存储HTTP会话与水平扩展WebSockets完全无关.

我用Spring Boot编写了一个完整的Web Chat应用程序(以及更多),它使用RabbitMQ作为一个完整的外部STOMP代理,它在GitHub上公开,所以请克隆它,在你的机器上运行应用程序并查看代码详细信息.

当谈到WebSocket连接丢失时,Spring无法做到这一点.实际上,重新连接必须由实现重新连接回调函数的客户端请求(例如,WebSocket握手流,客户端必须启动握手,而不是服务器).有一些客户端库可以为您透明地处理这个问题.那不是SockJS案.在聊天应用程序中,我还实现了这种重新连接功能.