使用Netty在UDP服务器中丢失了大量UDP请求

The*_*ers 6 multithreading udp nio threadpool netty

我用Netty编写了一个简单的UDP服务器,它只是在日志中打印出收到的消息(帧).为此,我创建了一个简单的帧解码器解码器和一个简单的消息处理程序.我还有一个客户端可以顺序和/或并行发送多个请求.

当我配置我的客户端测试程序按顺序发送几百个请求时,它们之间有一点延迟,我用Netty编写的服务器会正​​确接收它们.但是目前我增加了我的客户端中的同时请求数(例如100个),加上顺序请求和少量重复,我的服务器开始丢失许多请求.例如,当我发送50000个请求时,我的服务器仅在使用打印出收到的消息的简单ChannelHandler时才会收到大约49000个.

当我在这个处理程序前添加简单的帧解码器(打印出帧并将其复制到另一个缓冲区)时,服务器只处理一半的请求!!

我注意到,无论我为创建的NioDatagramChannelFactory指定的worker数量,总有一个且只有一个线程处理请求(我使用推荐的Executors.newCachedThreadPool()作为另一个参数).

我还创建了另一个类似的简单UDP服务器,它基于与JDK一起提供的DatagramSocket,它可以完美地处理每个请求,丢失0(零)!! 当我在我的客户端发送50000个请求(例如1000个线程)时,我在服务器中收到了50000个请求.

使用Netty配置UDP服务器时我做错了什么?或许Netty根本不是为了支持这种负载?为什么给定的缓存线程池只使用一个线程(我注意到只有一个线程,并且通过查看JMX jconsole并通过检查输出日志中的线程名称来使用它)?我认为如果更多线程按预期使用,服务器将能够轻松处理此类负载,因为我可以在不使用Netty时毫无问题地执行此操作!

请参阅下面的初始化代码:

...

lChannelfactory = new NioDatagramChannelFactory( Executors.newCachedThreadPool(), nbrWorkers );
lBootstrap = new ConnectionlessBootstrap( lChannelfactory );

lBootstrap.setPipelineFactory( new ChannelPipelineFactory() {
    @Override
    public ChannelPipeline getPipeline()
    {
        ChannelPipeline lChannelPipeline = Channels.pipeline();
        lChannelPipeline.addLast( "Simple UDP Frame Dump DECODER", new SimpleUDPPacketDumpDecoder( null ) );            
        lChannelPipeline.addLast( "Simple UDP Frame Dump HANDLER", new SimpleUDPPacketDumpChannelHandler( lOuterFrameStatsCollector ) );            
        return lChannelPipeline;
    }
} );

bindChannel = lBootstrap.bind( socketAddress );

...
Run Code Online (Sandbox Code Playgroud)

以及解码器中decode()方法的内容:

protected Object decode(ChannelHandlerContext iCtx, Channel iChannel, ChannelBuffer iBuffer) throws Exception
{
    ChannelBuffer lDuplicatedChannelBuffer = null;
    sLogger.debug( "Decode method called." );

    if ( iBuffer.readableBytes() < 8 ) return null;
    if ( outerFrameStatsCollector != null ) outerFrameStatsCollector.incrementNbrRequests();

    if ( iBuffer.readable() ) 
    {        
        sLogger.debug( convertToAsciiHex( iBuffer.array(), iBuffer.readableBytes() ) );                     
        lDuplicatedChannelBuffer = ChannelBuffers.dynamicBuffer( iBuffer.readableBytes() );            
        iBuffer.readBytes( lDuplicatedChannelBuffer );
    }

    return lDuplicatedChannelBuffer;
}
Run Code Online (Sandbox Code Playgroud)

以及我的处理程序中的messageReceived()方法的内容:

public void messageReceived(final ChannelHandlerContext iChannelHandlerContext, final MessageEvent iMessageEvent) throws Exception
{
    ChannelBuffer lMessageBuffer = (ChannelBuffer) iMessageEvent.getMessage();
    if ( outerFrameStatsCollector != null ) outerFrameStatsCollector.incrementNbrRequests();

    if ( lMessageBuffer.readable() ) 
    {        
        sLogger.debug( convertToAsciiHex( lMessageBuffer.array(), lMessageBuffer.readableBytes() ) );            
        lMessageBuffer.discardReadBytes();
    }
}
Run Code Online (Sandbox Code Playgroud)

Jes*_*jan 7

您尚未正确配置ConnectionlessBootstrap实例.

  1. 您必须使用最佳值配置以下内容.

    SO_SNDBUF大小,SO_RCVBUF大小和ReceiveBufferSizePredictorFactory

    lBootstrap.setOption("sendBufferSize", 1048576);
    
    lBootstrap.setOption("receiveBufferSize", 1048576);
    
    lBootstrap.setOption("receiveBufferSizePredictorFactory", 
     new AdaptiveReceiveBufferSizePredictorFactory(MIN_SIZE, INITIAL_SIZE, MAX_SIZE));
    
    Run Code Online (Sandbox Code Playgroud)

    检查DefaultNioDatagramChannelConfig类以获取更多详细信息.

  2. 管道正在使用Netty工作线程完成所有工作.如果工作线程被重载,它将延迟选择器事件循环的执行,并且在读/写通道时会出现瓶颈.您必须在管道中添加如下的执行处理程序.它将释放工作者线程来完成自己的工作.

    ChannelPipeline lChannelPipeline = Channels.pipeline();
    
    lChannelPipeline.addFirst("execution-handler", new ExecutionHandler(
      new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));
    
    //add rest of the handlers here
    
    Run Code Online (Sandbox Code Playgroud)