我使用的是Netty 3.2.7.我正在尝试在我的客户端编写功能,这样如果在一定时间(例如30秒)之后没有写入消息,则会向服务器发送"保持活动"消息.
经过一番挖掘,我发现WriteTimeoutHandler应该让我这样做.我在这里找到了这个解释:https://issues.jboss.org/browse/NETTY-79.
Netty文档中给出的示例是:
public ChannelPipeline getPipeline() {
// An example configuration that implements 30-second write timeout:
return Channels.pipeline(
new WriteTimeoutHandler(timer, 30), // timer must be shared.
new MyHandler());
}
Run Code Online (Sandbox Code Playgroud)
在我的测试客户端,我做到了这一点.在MyHandler中,我还覆盖了exceptionCaught()方法:
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
if (e.getCause() instanceof WriteTimeoutException) {
log.info("Client sending keep alive!");
ChannelBuffer keepAlive = ChannelBuffers.buffer(KEEP_ALIVE_MSG_STR.length());
keepAlive.writeBytes(KEEP_ALIVE_MSG_STR.getBytes());
Channels.write(ctx, Channels.future(e.getChannel()), keepAlive);
}
}
Run Code Online (Sandbox Code Playgroud)
无论客户端在什么时间内没有向通道写入任何内容,我从未调用过覆盖的exceptionCaught()方法.
查看WriteTimeoutHandler的源代码,其writeRequested()实现是:
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
long timeoutMillis = getTimeoutMillis(e);
if (timeoutMillis > 0) {
// Set timeout only when getTimeoutMillis() returns a positive value.
ChannelFuture future = e.getFuture();
final Timeout timeout = timer.newTimeout(
new WriteTimeoutTask(ctx, future),
timeoutMillis, TimeUnit.MILLISECONDS);
future.addListener(new TimeoutCanceller(timeout));
}
super.writeRequested(ctx, e);
}
Run Code Online (Sandbox Code Playgroud)
在这里,似乎这个实现说,"当请求写入时,进行新的超时.当写入成功时,取消超时."
使用调试器,似乎这就是正在发生的事情.写入完成后,超时将被取消.这不是我想要的行为.我想要的行为是:"如果客户端没有向通道写入任何信息30秒,则抛出WriteTimeoutException."
那么,这不是WriteTimeoutHandler的用途吗?这就是我从在线阅读的内容中解释它的方法,但实现似乎并不是这样.我用错了吗?我应该用别的吗?在我试图重写的同一客户端的Mina版本中,我看到sessionIdle()方法被覆盖以实现我想要的行为,但是这种方法在Netty中不可用.
use*_*153 12
对于Netty 4.0及更高版本,您应该像IdleStateHandler文档中的示例一样扩展ChannelDuplexHandler:
// An example that sends a ping message when there is no outbound traffic
// for 30 seconds. The connection is closed when there is no inbound traffic
// for 60 seconds.
public class MyChannelInitializer extends ChannelInitializer<Channel> {
@Override
public void initChannel(Channel channel) {
channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(60, 30, 0));
channel.pipeline().addLast("myHandler", new MyHandler());
}
}
// Handler should handle the IdleStateEvent triggered by IdleStateHandler.
public class MyHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
ctx.close();
} else if (e.state() == IdleState.WRITER_IDLE) {
ctx.writeAndFlush(new PingMessage());
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
我建议添加IdleStateHandler,然后添加你的自定义实现IdleStateAwareUpstreamHandler它可以在空闲状态下的反应.在许多不同的项目中,这对我来说非常好.
javadocs列出了以下示例,您可以将其用作实现的基础:
public class MyPipelineFactory implements ChannelPipelineFactory {
private final Timer timer;
private final ChannelHandler idleStateHandler;
public MyPipelineFactory(Timer timer) {
this.timer = timer;
this.idleStateHandler = new IdleStateHandler(timer, 60, 30, 0);
// timer must be shared.
}
public ChannelPipeline getPipeline() {
return Channels.pipeline(
idleStateHandler,
new MyHandler());
}
}
// Handler should handle the IdleStateEvent triggered by IdleStateHandler.
public class MyHandler extends IdleStateAwareChannelHandler {
@Override
public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
if (e.getState() == IdleState.READER_IDLE) {
e.getChannel().close();
} else if (e.getState() == IdleState.WRITER_IDLE) {
e.getChannel().write(new PingMessage());
}
}
}
ServerBootstrap bootstrap = ...;
Timer timer = new HashedWheelTimer();
...
bootstrap.setPipelineFactory(new MyPipelineFactory(timer));
...
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
11470 次 |
| 最近记录: |