Spring websocket 从多个线程发送消息

Sep*_* GH 5 java spring multithreading spring-websocket

我正在为我的一个基于 spring 的项目使用 Spring WebSocket 服务器实现。我遇到了一个错误说The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is invalid state. 我发现问题是同时从不同的线程写入 websocket。

我如何临时修复它:考虑我已经实现了以下方法

void sendMessageToSession(WebsocketSession session,String message);
Run Code Online (Sandbox Code Playgroud)

它将 TextMessage 发送到 websocket 会话。我无法使整个方法同步,因为多个线程可以为不同的 websocketSessions 和消息调用它。我也不能把会话放在同步块中(尝试过但没有用)

虽然,我像这样解决了我的问题

synchronized(session.getId()){ 
    //sending message;
}
Run Code Online (Sandbox Code Playgroud)

我不再面临这个问题。但是在同步块中使用字符串似乎不是一个好习惯。那么我还有什么其他解决方案?发送异步消息的最佳方式是什么?

PSConcurrentWebSocketSessionDecorator建立连接后我已经使用了,我正在使用更新的websocket。没有帮助。

session = new ConcurrentWebSocketSessionDecorator(session, (int) StaticConfig.MAXIMUM_WS_ASYNC_SEND_TIMEOUT, StaticConfig.MAXIMUM_WS_BINARY_BUFFER_SIZE * 2);
Run Code Online (Sandbox Code Playgroud)

注意 我将我的 websocet 会话保存在地图中,其中键是 session.getId,值是会话本身。

与其他一些 websocket 实现不同,Spring websocket 引用在每条消息上似乎并不相等。我通过他们的 ID 将会话保存在地图中,并且在每条消息上我检查传递的 websocket 与我已经放在我的地图上的 websocket 的相等性,它是错误的。

Sep*_* GH 9

By adding volatile keyword behind my WebsocketSession at where I persist my sessions, I solved the problem. I would be glad to know if this too is a bad practice. But my idea is that when writing to a websocket session from multiple threads, these threads loose the state of websocket because its not updated yet and that's why this exception is thrown.

By adding volatile, we make sure that websocket state has been updated before another thread uses it so writing to websocket works synchronized as expected.

I created a class named SessionData which holds websocketSession and all other data I need about session.

public class SessionData {
    private volatile WebSocketSession websocketSession;
    //...other 
    // getters and setters ...
}
Run Code Online (Sandbox Code Playgroud)

and I used SessionData as value of the map where session IDs are keys

then when getting websocketSession from SessionData and writing into it from different threads, volatile helped me to get updated websocketSession.


Update (2020)

One key note here is that you should use sessionData.getWebsocketSession.sendMessage(...) everytime you want to send a message to the session. You should never use the session directly, which means a code like this is a bad practice:

WebSocketSession websocketSession = sessionData.getWebSocketSession();
websocketSession.sendMessage(...);
Run Code Online (Sandbox Code Playgroud)

You would never know what changes has applied to websocket session between these two lines of code (which might be more than 2 in your case).

And a code like this is better:

sessionData.getWebSocketSession().sendMessage(...);
Run Code Online (Sandbox Code Playgroud)

Also never publish directly into sessions that are passed to you inside Spring websocket MessageHandlers. Otherwise you probably get that error again.

This is why its good practice to map sessionId of WebSocketSession to SessionData when connection opens. You can use this repository to get volatile session using session id instead of using the session directly.

  • 事实并非如此。这两种选择在本质上没有区别。您需要将会话包装在 ConcurrentWebSocketSessionDecorator 中。 (2认同)

小智 7

ConcurrentWebSocketSessionDecorator就像多线程中的魅力一样,它就是为此而设计的。您的地图实施可能存在问题。

示例代码:

private final Map<String, SessionData> sessions = new ConcurrentHashMap<>();

@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception 
{
    // Use the following will crash :
    //sessions.put(session.getId(), new SessionData(session));

    // Use ConcurrentWebSocketSessionDecorator is safe :
    sessions.put(session.getId(), new SessionData(new ConcurrentWebSocketSessionDecorator (session, 2000, 4096)));
    super.afterConnectionEstablished(session);
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception
{
    sessions.remove(session.getId());
    super.afterConnectionClosed(session, status); 
}

public void send(WebSocketSession session, String msg) throws MessagingException {
    try {
        session.sendMessage (new TextMessage(msg));
    } catch (IOException ex) {
        throw new MessagingException(ex.getMessage());
    }
}
Run Code Online (Sandbox Code Playgroud)

要轻松测试多线程中的行为:

    public void sendMT(WebSocketSession session, String msg) throws MessagingException{
    for (int i=0; i<3; i++){
        new Thread(){
          @Override
          public void run(){
              send (session, msg);
        }.start();  
    }
}
Run Code Online (Sandbox Code Playgroud)