是否可以在全双工 TCP 通信中使用 Netty?

2 java sockets tcp netty

似乎 Netty 只能处理单个 TCP 连接的读或写操作,但不能同时处理两者。我有一个客户端连接到用 Netty 编写的回显服务器应用程序并发送大约 20 万条消息。

回显服务器只接受客户端连接并发回客户端发送的任何消息。

问题是我不能让 Netty 在全双工模式下使用 TCP 连接。我想同时处理服务器端的读写操作。在我的情况下,Netty 从客户端读取所有消息,然后将它们发回,这导致了高延迟。

客户端应用程序为每个连接触发两个线程。一个用于任何写操作,另一个用于读操作。是的,客户端是用普通的旧 Java IO 风格编写的。

也许问题与我在服务器端设置的 TCP 选项有关:

    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
    .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, bufferWatermarkHigh)
    .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, bufferWatermarkLow)
    .childOption(ChannelOption.SO_RCVBUF, bufferInSize)
    .childOption(ChannelOption.SO_SNDBUF, bufferOutSize)
    .childOption(ChannelOption.SO_REUSEADDR, true)
    .childOption(ChannelOption.SO_KEEPALIVE, true)
    .childOption(ChannelOption.TCP_NODELAY, true);
Run Code Online (Sandbox Code Playgroud)

Fer*_*big 5

在您使用 github 存储库提供的示例中,有多个错误的地方:

  • 你直接从channelActive方法写

    在 netty 中,有一个策略是每个处理程序只同时执行一个传入方法,这是为了使开发更容易,并确保类的方法以正确的顺序执行,这样做也是为了确保方法的副作用在其他类中是可见的。

    • 您正在打印消息 channelReadComplete

    channelReadComplete在清除当前消息缓冲区后调用,channelRead在调用之前可能会调用多次。

    • 缺少成帧器

    构建消息或计算消息的大小是检测有多少字节进入的方法。对于客户端的 2 次写入可能是在没有此框架的情况下在服务器上读取 1 次,作为测试,我使用了new io.netty.handler.codec.FixedLengthFrameDecoder(120),因此我可以使用i++到达服务器和客户端的消息数量进行计数。

  • **使用重量级打印实现轻量级操作。

    根据我的分析器,大部分时间都花在通话上LOG.info(),记录器通常就是这种情况,因为它们在幕后做了很多工作,例如输出流的同步。通过使记录器每 1000 条消息只记录一次,我的速度得到了巨大的提高(而且由于我在双核上运行,所以计算机真的很慢......)

  • 重发码

    发送代码ByteBuf每次都会重新创建。通过重用,ByteBuf您可以进一步提高发送速度,您可以通过创建 ByteBuf 1 次,然后.retain()在每次传递时调用它来实现此目的。

    这可以通过以下方式轻松完成:

    ByteBuf buf = createMessage(MESSAGE_SIZE);
    for (int i = 0; i < NUMBER_OF_MESSAGES; ++i) {
        ctx.writeAndFlush(buf.retain());
    }
    
    Run Code Online (Sandbox Code Playgroud)
  • 减少冲洗量

    通过减少刷新量,您可以获得更高的本机性能。每次调用 flush() 都是调用网络堆栈以发送待处理消息。如果我们将该规则应用于上面的代码,它将给出以下代码:

    ByteBuf buf = createMessage(MESSAGE_SIZE);
    for (int i = 0; i < NUMBER_OF_MESSAGES; ++i) {
        ctx.write(buf.retain());
    }
    ctx.flush();
    
    Run Code Online (Sandbox Code Playgroud)

最终代码

有时,您只想查看结果并亲自尝试:

App.java(不变)

public class App {
  public static void main( String[] args ) throws InterruptedException {
    final int PORT = 8080;
    runInSeparateThread(() -> new Server(PORT));
    runInSeparateThread(() -> new Client(PORT));
  }
  private static void runInSeparateThread(Runnable runnable) {
    new Thread(runnable).start();
  }
}
Run Code Online (Sandbox Code Playgroud)

客户端.java

public class Client {
  public Client(int port) {
    EventLoopGroup group = new NioEventLoopGroup();
    try {
      ChannelFuture channelFuture = createBootstrap(group).connect("192.168.171.102", port).sync();
      channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      group.shutdownGracefully();
    }
  }
  private Bootstrap createBootstrap(EventLoopGroup group) {
    return new Bootstrap().group(group)
        .channel(NioSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true)
        .handler(
            new ChannelInitializer<SocketChannel>() {
              @Override
              protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new io.netty.handler.codec.FixedLengthFrameDecoder(200));
                ch.pipeline().addLast(new ClientHandler());
              }
            }
        );
  }
}
Run Code Online (Sandbox Code Playgroud)

客户端处理程序

public class ClientHandler extends ChannelInboundHandlerAdapter {
  private final Logger LOG = LoggerFactory.getLogger(ClientHandler.class.getSimpleName());
  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    final int MESSAGE_SIZE = 200;
    final int NUMBER_OF_MESSAGES = 200000;
    new Thread(()->{
    ByteBuf buf = createMessage(MESSAGE_SIZE);
    for (int i = 0; i < NUMBER_OF_MESSAGES; ++i) {
      ctx.writeAndFlush(buf.retain());
    }}).start();
  }
  int i;
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if(i++%10000==0)
    LOG.info("Got a message back from the server "+(i));
    ((io.netty.util.ReferenceCounted)msg).release();
  }
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.close();
  }
  private ByteBuf createMessage(int size) {
    ByteBuf message = Unpooled.buffer(size);
    for (int i = 0; i < size; ++i) {
      message.writeByte((byte) i);
    }
    return message;
  }
}
Run Code Online (Sandbox Code Playgroud)

服务器端.java

public class Server {
  public Server(int port) {
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      ChannelFuture channelFuture = createServerBootstrap(bossGroup, workerGroup).bind(port).sync();
      channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }
  private ServerBootstrap createServerBootstrap(EventLoopGroup bossGroup,
                                                EventLoopGroup workerGroup) {
    return new ServerBootstrap().group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
             ch.pipeline().addLast(new io.netty.handler.codec.FixedLengthFrameDecoder(200));
             ch.pipeline().addLast(new ServerHandler());
          }
        });
  }
}
Run Code Online (Sandbox Code Playgroud)

服务器处理程序

public class ServerHandler extends ChannelInboundHandlerAdapter {
  private final Logger LOG = LoggerFactory.getLogger(ServerHandler.class.getSimpleName());
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.writeAndFlush(msg).addListener(f->{if(f.cause()!=null)LOG.info(f.cause().toString());});
    if(i++%10000==0)
    LOG.info("Send the message back to the client "+(i));
    ;
  }
  int i;
  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
   // LOG.info("Send the message back to the client "+(i++));
    ctx.flush();
  }
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.close();
  }
}
Run Code Online (Sandbox Code Playgroud)

检测结果

我决定测试如果我改变记录的 incmoing 消息的频率会发生什么,这些是测试结果:

ByteBuf buf = createMessage(MESSAGE_SIZE);
for (int i = 0; i < NUMBER_OF_MESSAGES; ++i) {
    ctx.writeAndFlush(buf.retain());
}
Run Code Online (Sandbox Code Playgroud)

* 时间应该是一粒盐,没有进行真正的基准测试,只有 1 次快速运行程序

这表明通过减少对日志的调用量(精度),我们可以获得更好的传输速率(速度)。