Netty 消息接收超时

use*_*577 5 java timeout netty

我需要我的 ChannelHandler 中的 messageReceived(或 netty 4.0 的 channelRead0)方法在某个时间阈值后超时。我尝试过 Read/WriteTimeoutHandlers,但是当我的 messageReceived 处理时间超过超时时无法生成异常。这是我尝试过的:

public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    public void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();

        p.addLast(new HttpRequestDecoder());
        p.addLast(new HttpResponseEncoder());
        p.addLast(new HttpObjectAggregator(1048576));
        p.addLast(new HttpContentCompressor());
        p.addLast("readTimeoutHandler", new ReadTimeoutHandler(1));
        //p.addLast("idleTimeoutHandler", new IdleStateHandler(1, 1, 1));
        p.addLast(new HttpRequestHandler());
      }
  }



public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
  public static class Call implements Callable<Boolean> {
    public Boolean call() {
        for(long i = 0; i<100000000;i++){
            for(long j = 0; j<100;j++){

            }
        }
        return true;        
    }
 }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    System.out.println("** Exception caught **");
    if (cause instanceof ReadTimeoutException) {
       System.out.println("*** Request timed out ***");
    } 
    else if (cause instanceof WriteTimeoutException) {
           System.out.println("*** Request timed out on write ***");
        } 
    cause.printStackTrace();
    ctx.close();
  }

   @Override
    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {

        FullHttpRequest request = this.request = (FullHttpRequest) msg;
        /*Callable<Boolean> callable = new Call();

    ScheduledExecutorService scheduler =
             Executors.newScheduledThreadPool(1);

    ScheduledFuture<Boolean> handle = scheduler.schedule(callable, 4, TimeUnit.SECONDS);
    boolean v  = handle.get();*/
    for(long i = 0; i<100000000;i++){
        for(long j = 0; j<100;j++){

        }
    }
    System.out.println("Wait done");

        try{
            CharSequence content = appController.handleRequest(
                    url,
                    ctx.channel().remoteAddress().toString(),
                    parseURL(url), reqBody);
            if(content!=null){
                writeResponse(ctx, HttpResponseStatus.OK, content);
            }
        }catch(AppRuntimeException e){
            CharSequence content = e.getMessage(); 
            if(content != null){
                OptHttpStatus status = e.getOptHttpStatus();
                writeResponse(ctx, HttpResponseStatus.valueOf(status.getCode()), content);
            }
        }

   private void writeResponse(ChannelHandlerContext ctx, HttpResponseStatus status, CharSequence content) {
    // Decide whether to close the connection or not.
    boolean keepAlive = HttpUtil.isKeepAlive(request);

    // Build the response object.
    FullHttpResponse response = new DefaultFullHttpResponse(
            HTTP_1_1, 
            status,
            Unpooled.copiedBuffer(content, CharsetUtil.UTF_8));
            //Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8));
    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8"); 
    response.headers().set("Access-Control-Allow-Origin", "*");

    if (keepAlive) {
        //Add 'Content-Length' header only for a keep-alive connection.
        response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        // Add keep alive header as per:
        // - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
        response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
    }
    // Write the response.
    ChannelFuture ch = ctx.writeAndFlush(response);

    if(!keepAlive){
        ch.addListener(ChannelFutureListener.CLOSE);
    }
}
    }
Run Code Online (Sandbox Code Playgroud)

我在 channelRead0 方法中添加了一个用于模拟等待的冗余 for 循环来模拟长处理时间。但是没有产生超时异常。我也试过安排等待,但没有得到超时异常你能提出任何解决方案吗?

Chr*_*ole 2

您尝试执行的操作有两个问题,一个是由于线程休眠而不是通过调度使用异步延迟引起的,另一个是您需要返回使用ReadTimeoutHandler

不要睡觉或阻塞

相反,Thread.sleep你为什么不尝试延迟安排你的工作呢?使用 Netty,您的超时将发生在您发送到睡眠状态的同一个线程中,因此您最终仍然会在超时检查发生之前编写响应。如果您安排延迟,则线程可以自由地检测超时并触发异常。

请记住,netty 对多个通道使用一个 IO 线程,因此您不应该在该线程中执行任何阻塞/同步工作。Thread.sleep、繁忙循环、同步调用、阻塞调用(如Future.get())不应该在 IO 线程中进行,因为这会影响其他通道的性能。

您可以使用上下文联系执行者来安排您的延迟工作。

public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
    ctx.executor().schedule(new Runnable() {
        @Override
        public void run() {
            // Put your try/catch in here
        }
    }, 5000, TimeUnit.MILLISECONDS);
}
Run Code Online (Sandbox Code Playgroud)

如果您必须拨打阻止电话

如果您无法避免进行阻塞调用或进行一些密集处理,请在添加处理处理程序时使用不同的 EventExecutorGroup,以允许从 IO 工作线程异步完成该工作。请务必为其提供足够的线程来满足您的预期工作负载和连接数量。

下面的示例代码应该适用于您的 Thread.sleep 或繁忙循环。请务必使用满足您需求的数字定义/​​替换 OFFLOAD_THREADS 。

public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
    private final EventExecutorGroup executors = new DefaultEventExecutorGroup(OFFLOAD_THREADS);
    @Override
    public void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();

        p.addLast(new HttpRequestDecoder());
        p.addLast(new HttpResponseEncoder());
        p.addLast(new HttpObjectAggregator(1048576));
        p.addLast(new HttpContentCompressor());
        p.addLast("idleTimeoutHandler", new IdleStateHandler(0, 1, 0));
        p.addLast(executors, new HttpRequestHandler());
    }
}
Run Code Online (Sandbox Code Playgroud)

使用空闲状态处理程序

如果写入时间过长,WriteTimeoutHandler 就会超时。在您开始第一次写入之前,它甚至不会开始计时。看起来您试图在写入开始之前造成延迟,因此即使您按照上面的建议停止使用睡眠, WriteTimeoutHandler 也不会为您触发。

如果您确实想超时​​确定开始编写所需的时间,那么您应该使用IdleStateHandler并处理它触发的用户事件。与 WriteTimeoutHandler 不同,IdleStateHandler 将在通道变为活动状态时开始计数,而不是等待写入开始,因此如果您的处理时间太长(但仅当您进行异步处理时),它将触发。

确保在使用 IdleStateHandler 时捕获用户事件并对用户事件做出反应

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {
        // Handle the event here
    }
    else {
        super.userEventTriggered(ctx, evt);
    }
}
Run Code Online (Sandbox Code Playgroud)