处理ReadTimeoutHandler超时

dan*_*nik 0 java nio netty

我只是无法理解为什么我的读取时间不起作用.我想要做的就是等待
10秒钟让某个线程将消息发送到,BlockedQueue<String>并在超时时返回客户端上的某种响应.

public class NioAsynChatPipelineFactory implements ChannelPipelineFactory {

     private static Timer timer = new HashedWheelTimer();
     private final ChannelHandler timeoutHandler = new ReadTimeoutHandler(timer, 10);

    @Override
    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = Channels.pipeline();
         pipeline.addLast("decoder", new HttpRequestDecoder());
         pipeline.addLast("encoder", new HttpResponseEncoder());
         pipeline.addLast("handler", new NioAsynChatHandler());
         pipeline.addLast("timeout", this.timeoutHandler);
        return pipeline;
    }

}
Run Code Online (Sandbox Code Playgroud)

现在我的处理程序看起来像这样

public class NioAsynChatHandler extends SimpleChannelUpstreamHandler{

     @Override
     public void handleUpstream(
        ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
               super.handleUpstream(ctx, e);
     }

 @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
        throws Exception {
         System.out.println("Exception");
        \\writing some kind of response and closing channel.
    }

    @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {

    Thread thread = new Thread(new ConsmerTask(e.getChannel()));
                thread.start();

}
Run Code Online (Sandbox Code Playgroud)

而在ConsumerTask中,我只是在等待BlockingQueue获得响应

public class ConsumerTask implements Runnable{

    private Channel channel;


public ConsumerTask(Channel channel){
        this.channel = channel;
}

    @Override
    public void run() {
         try{
            while(true){
              String message = queue.take();
            }
         } catch(InterruptedException ex){
           Thread.currentThread.interrupt();
         } finally{
         //write something to channel and close it   
        }
}
Run Code Online (Sandbox Code Playgroud)

我的问题是,我没有看到任何超时发生的重复.我究竟做错了什么?

更新:

 public static final BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>();
Run Code Online (Sandbox Code Playgroud)

其实我的问题更通用,如何在外部线程中等待某些东西时关闭超时通道?

更新2: 另一个问题:由于我在Cha中运行外部线程的事实,OrderedMemoryAwareThreadPoolExecutor在管道中使用会更好吗?它会提高性能吗?

tru*_*tin 5

这基本上是因为你把它置于ReadTimeoutHandler错误的位置.请把它放在管道的第一个位置(即在所有处理程序之前).