我尝试创建一个代理,然后传递Netty中的所有trafic其他处理程序.我知道我应该管理对ByteBuf的引用,但我无法理解如何做到这一点.我的例子和例外如下.
初始化:
public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> {
private final SslContext sslCtx;
private final String remoteHost;
private final int remotePort;
public HexDumpProxyInitializer(SslContext sslCtx, String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
this.sslCtx = sslCtx;
}
public HexDumpProxyInitializer(String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
this.sslCtx = null;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new HexDumpProxyFrontendHandler(remoteHost, remotePort));
p.addLast(new InboundPrinterHandler());
}
}
Run Code Online (Sandbox Code Playgroud)
HexDumpProxyFrontendHandler
public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
private final String remoteHost;
private final int remotePort;
private Channel outboundChannel;
public HexDumpProxyFrontendHandler(String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
final Channel inboundChannel = ctx.channel();
// Start the connection attempt.
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler(new HexDumpProxyBackendHandler(inboundChannel))
.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(remoteHost, remotePort);
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// connection complete start to read first data
inboundChannel.read();
} else {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
}
}
});
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (outboundChannel.isActive()) {
outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// was able to flush out data, start to read the next chunk
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
ctx.fireChannelRead(msg);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (outboundChannel != null) {
closeOnFlush(outboundChannel);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
closeOnFlush(ctx.channel());
}
static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
}
Run Code Online (Sandbox Code Playgroud)
InboundPrinterHandler
public class InboundPrinterHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf bb = null;
bb = (ByteBuf) msg;
System.out.println("INBOUND:\n\n"+bb.toString(Charset.defaultCharset()));
System.out.println("\n\n\n");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
}
Run Code Online (Sandbox Code Playgroud)
例外
io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1407)
at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1353)
at io.netty.buffer.PooledUnsafeDirectByteBuf.internalNioBuffer(PooledUnsafeDirectByteBuf.java:331)
at io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:614)
at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1213)
at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1208)
at com.netas.sv.proxy.InboundPrinterHandler.channelRead(InboundPrinterHandler.java:16)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at com.netas.sv.proxy.HexDumpProxyFrontendHandler.channelRead(HexDumpProxyFrontendHandler.java:67)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:574)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:488)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
Run Code Online (Sandbox Code Playgroud)if (outboundChannel.isActive()) { outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() { // Snip } ctx.fireChannelRead(msg);
将ByteBuf传递给另一个通道后,其他通道的责任是再次减少引用计数.因为另一个通道现在递减了引用计数,所以它现在不可用.
解决此问题的最佳方法是在使用.retain()以下方法将流量传递到其他通道之前手动递增值:
outboundChannel.writeAndFlush(msg.retain()).addListener(new ChannelFutureListener() {
// Your remaining code
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5364 次 |
| 最近记录: |