Kay*_*ayJ 6 java client-server netty
我正在尝试使用简单的服务器 - 客户端应用程序进入Netty(代码见下文).
我正在努力解决两个问题:
ConfigServerHandler和.正确调用ConfigClientHandler.但是FeedbackServerHandler分别是 从不调用FeedbackClientHandler.为什么?根据文档,应该一个接一个地调用处理程序.
我想要几个处理程序.这些处理程序中的每一个仅对另一方发送的一些消息感兴趣(例如,由客户端发送,由服务器接收).
谢谢你的帮助!
KJ
这是服务器的创建方式:
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(
new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
new ConfigServerHandler(),
new FeedbackServerHandler());
}
});
b.bind(mPort).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
Run Code Online (Sandbox Code Playgroud)
其中一个Handler类(FeedbackServerHandler完全相同但解析为Integer):
public class ConfigServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("ConfigServerHandler::channelRead, " +(String)msg);
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
Run Code Online (Sandbox Code Playgroud)
客户端看起来非常相似:
public Client(String host, int port) throws InterruptedException {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(
new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
new ConfigClientHandler(),
new FeedbackClientHandler());
}
});
b.connect(host, port).sync().channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
Run Code Online (Sandbox Code Playgroud)
这是一个客户端处理程序(另一个发送Integer消息并在'channelRead'方法中解析为Integer):
public class ConfigClientHandler extends ChannelInboundHandlerAdapter {
private final String firstMessage = "blubber";
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("ConfigClientHandler::channelActive");
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("ConfigClientHandler::channelRead, " +(String)msg);
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
Run Code Online (Sandbox Code Playgroud)
}
你正在使用ChannelInboundHandlerAdapter,这对你的"中间"处理程序来说很好ConfigXxxxxHandler.
但是你使用channelRead方法然后在里面使用ctx.write(msg).ctx.write(msg)将通过前一个处理程序first(ObjectDecoder)将msg写回另一个服务器,而不是下一个处理程序(FeedbackClientHandler在您的情况下).
如果要将消息发送到下一个处理程序,则应使用以下内容:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("ConfigClientHandler::channelRead, " +(String)msg);
ctx.fireChannelRead(msg);
}
Run Code Online (Sandbox Code Playgroud)
,当然没有ctx.flush()在channelReadComplete(因为没有更多的写有).但是在你的决赛中FeedbackClientHandler,当然要使用flush方法ctx.write(yourNewMessage)或使用ctx.writeAndFlush(yourNewMessage).
所以要恢复:
ctx.write将消息发送到线路,所以到前一个处理器下行到通道然后到网络,所以出站方式ctx.fireChannelRead将消息发送到下一个处理程序(相反的方式),所以入站方式有关详细信息,请参见http://netty.io/wiki/new-and-noteworthy-in-4.0.html#wiki-h4-16.
您也许应该反转编码器/解码器,因为通常首先是解码器,然后是流水线中的编码器.
p.addLast(
new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
new ObjectEncoder(),
new ConfigClientHandler(),
new FeedbackClientHandler());
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3410 次 |
| 最近记录: |