The*_*ers 6 multithreading udp nio threadpool netty
我用Netty编写了一个简单的UDP服务器,它只是在日志中打印出收到的消息(帧).为此,我创建了一个简单的帧解码器解码器和一个简单的消息处理程序.我还有一个客户端可以顺序和/或并行发送多个请求.
当我配置我的客户端测试程序按顺序发送几百个请求时,它们之间有一点延迟,我用Netty编写的服务器会正确接收它们.但是目前我增加了我的客户端中的同时请求数(例如100个),加上顺序请求和少量重复,我的服务器开始丢失许多请求.例如,当我发送50000个请求时,我的服务器仅在使用打印出收到的消息的简单ChannelHandler时才会收到大约49000个.
当我在这个处理程序前添加简单的帧解码器(打印出帧并将其复制到另一个缓冲区)时,服务器只处理一半的请求!!
我注意到,无论我为创建的NioDatagramChannelFactory指定的worker数量,总有一个且只有一个线程处理请求(我使用推荐的Executors.newCachedThreadPool()作为另一个参数).
我还创建了另一个类似的简单UDP服务器,它基于与JDK一起提供的DatagramSocket,它可以完美地处理每个请求,丢失0(零)!! 当我在我的客户端发送50000个请求(例如1000个线程)时,我在服务器中收到了50000个请求.
使用Netty配置UDP服务器时我做错了什么?或许Netty根本不是为了支持这种负载?为什么给定的缓存线程池只使用一个线程(我注意到只有一个线程,并且通过查看JMX jconsole并通过检查输出日志中的线程名称来使用它)?我认为如果更多线程按预期使用,服务器将能够轻松处理此类负载,因为我可以在不使用Netty时毫无问题地执行此操作!
请参阅下面的初始化代码:
...
lChannelfactory = new NioDatagramChannelFactory( Executors.newCachedThreadPool(), nbrWorkers );
lBootstrap = new ConnectionlessBootstrap( lChannelfactory );
lBootstrap.setPipelineFactory( new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline()
{
ChannelPipeline lChannelPipeline = Channels.pipeline();
lChannelPipeline.addLast( "Simple UDP Frame Dump DECODER", new SimpleUDPPacketDumpDecoder( null ) );
lChannelPipeline.addLast( "Simple UDP Frame Dump HANDLER", new SimpleUDPPacketDumpChannelHandler( lOuterFrameStatsCollector ) );
return lChannelPipeline;
}
} );
bindChannel = lBootstrap.bind( socketAddress );
...
Run Code Online (Sandbox Code Playgroud)
以及解码器中decode()方法的内容:
protected Object decode(ChannelHandlerContext iCtx, Channel iChannel, ChannelBuffer iBuffer) throws Exception
{
ChannelBuffer lDuplicatedChannelBuffer = null;
sLogger.debug( "Decode method called." );
if ( iBuffer.readableBytes() < 8 ) return null;
if ( outerFrameStatsCollector != null ) outerFrameStatsCollector.incrementNbrRequests();
if ( iBuffer.readable() )
{
sLogger.debug( convertToAsciiHex( iBuffer.array(), iBuffer.readableBytes() ) );
lDuplicatedChannelBuffer = ChannelBuffers.dynamicBuffer( iBuffer.readableBytes() );
iBuffer.readBytes( lDuplicatedChannelBuffer );
}
return lDuplicatedChannelBuffer;
}
Run Code Online (Sandbox Code Playgroud)
以及我的处理程序中的messageReceived()方法的内容:
public void messageReceived(final ChannelHandlerContext iChannelHandlerContext, final MessageEvent iMessageEvent) throws Exception
{
ChannelBuffer lMessageBuffer = (ChannelBuffer) iMessageEvent.getMessage();
if ( outerFrameStatsCollector != null ) outerFrameStatsCollector.incrementNbrRequests();
if ( lMessageBuffer.readable() )
{
sLogger.debug( convertToAsciiHex( lMessageBuffer.array(), lMessageBuffer.readableBytes() ) );
lMessageBuffer.discardReadBytes();
}
}
Run Code Online (Sandbox Code Playgroud)
您尚未正确配置ConnectionlessBootstrap实例.
您必须使用最佳值配置以下内容.
SO_SNDBUF大小,SO_RCVBUF大小和ReceiveBufferSizePredictorFactory
lBootstrap.setOption("sendBufferSize", 1048576);
lBootstrap.setOption("receiveBufferSize", 1048576);
lBootstrap.setOption("receiveBufferSizePredictorFactory",
new AdaptiveReceiveBufferSizePredictorFactory(MIN_SIZE, INITIAL_SIZE, MAX_SIZE));
Run Code Online (Sandbox Code Playgroud)
检查DefaultNioDatagramChannelConfig类以获取更多详细信息.
管道正在使用Netty工作线程完成所有工作.如果工作线程被重载,它将延迟选择器事件循环的执行,并且在读/写通道时会出现瓶颈.您必须在管道中添加如下的执行处理程序.它将释放工作者线程来完成自己的工作.
ChannelPipeline lChannelPipeline = Channels.pipeline();
lChannelPipeline.addFirst("execution-handler", new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));
//add rest of the handlers here
Run Code Online (Sandbox Code Playgroud)