Netty获取了一个exceptionCaught()事件被触发,它在TextWebsocketEncoder的管道尾部到达

use*_*898 5 java exception netty

我尝试进行简单的Web套接字解码然后编码,但是当它传递TextWebsocketDecoder处理程序时我得到了这个异常:

io.netty.channel.DefaultChannelPipeline$TailContext exceptionCaught
WARNING: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
    at io.netty.buffer.DefaultByteBufHolder.release(DefaultByteBufHolder.java:73)
    at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:59)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:112)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
    at io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler$1.channelRead(WebSocketServerProtocolHandler.java:147)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:276)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:263)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

我所拥有的是简单的初始化程序,它直到TextWebsocketEncoder才能找到它:

public class ServerInitializer extends ChannelInitializer<Channel> {
    private final ChannelGroup group;

    public GameServerInitializer(ChannelGroup group) {
        this.group = group;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(64 * 1024));
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpRequestHandler("/ws"));
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        pipeline.addLast(new TextWebSocketFrameHandler(group));
        pipeline.addLast("textWebsocketDecoder",new TextWebsocketDecoder());
        pipeline.addLast("textWebsocketEncoder",new TextWebsocketEncoder());
    }
}
Run Code Online (Sandbox Code Playgroud)

TextWebSocketFrameHandler

public class TextWebSocketFrameHandler  extends SimpleChannelInboundHandler<TextWebSocketFrame>{
     private final ChannelGroup group;

        public TextWebSocketFrameHandler(ChannelGroup group) {
            this.group = group;
        }

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {

                ctx.pipeline().remove(HttpRequestHandler.class);

                group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));

                group.add(ctx.channel());

            } else {
                super.userEventTriggered(ctx, evt);
            }
        }

        @Override
        public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
             ctx.fireChannelRead(msg);
            //group.writeAndFlush(msg.retain());
        }
}
Run Code Online (Sandbox Code Playgroud)

这是TextWebsocketDecoder和TextWebsocketEncoder:

TextWebsocketDecoder:

public class TextWebsocketDecoder extends MessageToMessageDecoder<TextWebSocketFrame>
{

    @Override
    protected void decode(ChannelHandlerContext ctx, TextWebSocketFrame frame, List<Object> out) throws Exception
    {
        String json = frame.text(); 
        JSONObject jsonObject = new JSONObject(json);
        int type = jsonObject.getInt("type");
        JSONArray msgJsonArray = jsonObject.getJSONArray("msg");
        String user = msgJsonArray.getString(0);
        String pass = msgJsonArray.getString(1);
        String connectionkey = msgJsonArray.getString(2);
        int timestamp = jsonObject.getInt("timestamp");

        JSONObject responseJson = new JSONObject();
        responseJson.put("type",Config.LOGIN_SUCCESS);
        responseJson.put("connectionkey",connectionkey);

        out.add(responseJson); // After This im getting the exception !!!
    }
}
Run Code Online (Sandbox Code Playgroud)

TextWebsocketEncoder

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

public class TextWebsocketEncoder extends MessageToMessageEncoder<JSONObject>
{

    @Override
    protected void encode(ChannelHandlerContext arg0, JSONObject arg1, List<Object> out) throws Exception {
        String json = arg1.toString();
        out.add(new TextWebSocketFrame(json));          
    } 

}
Run Code Online (Sandbox Code Playgroud)

Fer*_*big 3

例外情况

\n\n

在您的 TextWebSocketFrameHandler 内部,您正在调用ctx.fireChannelRead(msg);,这会将消息向上传递 1 条链,但MessageToMessageDecoder不准备处理此问题。为了解释这个问题,我需要解释一下 MessageToMessageDecoder 是如何工作的。

\n\n

MessageToMessageDecoder工作原理是捕获来自上游的每条消息并将它们传递给您的自定义代码,您的自定义代码处理该工作,mtmd 处理您传入的资源的关闭。

\n\n

由于您将引用传递给另一方,因此您实际上多次关闭了 WebSocketFrame,从而导致错误。MessageToMessageDecoder 甚至在javadoc中对此发出警告。

\n\n

为了解决这个问题,我们按照手册中的说明进行操作,并使我们的频道阅读以下内容:

\n\n
@Override\npublic void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {\n     msg.retain(); // ferrybig: fixed bug http://stackoverflow.com/q/34634750/1542723\n     ctx.fireChannelRead(msg);\n     //group.writeAndFlush(msg.retain());\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

不寄回问题

\n\n

在您的评论中,您指出代码不会发回任何内容。这是预期的,因为您的管道仅消耗数据并将其传递到链上。要解决此问题,需要对管道进行一些返工。

\n\n
    \n
  1. 我们需要交换 json-webframe 解码器和编码器的顺序:

    \n\n
    pipeline.addLast("textWebsocketDecoder",new TextWebsocketEncoder());\npipeline.addLast("textWebsocketEncoder",new TextWebsocketDecoder());\n
    Run Code Online (Sandbox Code Playgroud)\n\n

    这是因为您的解码器正在生成将发送回处理程序链 \xe2\x86\x91 的输出,如果解码器位于该输出之上,则编码器将看不到此输出。(您的解码器不应按照 netty 命名称为解码器)

  2. \n
  3. 我们需要更改您的解码器,以将生成的数据实际发送回链中的 \xe2\x86\x91 而不是 \xe2\x86\x93 到不存在的 void 中。

    \n\n

    为了进行这些更改,我们将让扩展TextWebSocketDecoderChannelInboundHandlerAdapter不是MessageToMessageDecoder<TextWebSocketFrame>因为我们正在处理消息而不是将它们传递给其他处理程序。

    \n\n

    我们正在将解码方法的签名更改为channelRead(ChannelHandlerContext ctx, Object msg),并添加一些样板代码:

    \n\n
    public void channelRead(ChannelHandlerContext ctx, Object msg) /* throws Exception */\n    TextWebSocketFrame frame = (TextWebSocketFrame) msg;\n    try {\n        /* Remaining code, follow the steps further of see end result */\n    } finally {\n        frame.release();\n    }\n}\n
    Run Code Online (Sandbox Code Playgroud)
  4. \n
  5. 我们调整代码以将结果向上传递而不是向下传递:

    \n\n
    public void channelRead(ChannelHandlerContext ctx, Object msg) /* throws Exception */\n    TextWebSocketFrame frame = (TextWebSocketFrame) msg;\n    try {\n\n        String json = frame.text(); \n        JSONObject jsonObject = new JSONObject(json);\n        int type = jsonObject.getInt("type");\n        JSONArray msgJsonArray = jsonObject.getJSONArray("msg");\n        String user = msgJsonArray.getString(0);\n        String pass = msgJsonArray.getString(1);\n        String connectionkey = msgJsonArray.getString(2);\n        int timestamp = jsonObject.getInt("timestamp");\n\n        JSONObject responseJson = new JSONObject();\n        responseJson.put("type",Config.LOGIN_SUCCESS);\n        responseJson.put("connectionkey",connectionkey);\n\n        ctx.writeAndFlush(responseJson)\n    } finally {\n        frame.release();\n    }\n}\n
    Run Code Online (Sandbox Code Playgroud)
  6. \n
\n\n

请注意,您可能会想从异常中删除我们之前的代码,但这样做会在 netty 的异步性质下运行时触发未定义的行为。

\n