我想写一个基于netty的客户端.它应该有方法public String send(String msg); 哪个应该从服务器或某个未来返回响应 - doesen't不重要.它也应该是多线程的.像这样:
public class Client {
public static void main(String[] args) throws InterruptedException {
Client client = new Client();
}
private Channel channel;
public Client() throws InterruptedException {
EventLoopGroup loopGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(loopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder()).
addLast(new StringEncoder()).
addLast(new ClientHandler());
}
});
channel = b.connect("localhost", 9091).sync().channel();
}
public String sendMessage(String msg) {
channel.writeAndFlush(msg);
return ??????????;
}
Run Code Online (Sandbox Code Playgroud)
}
我不知道如何在调用writeAndFlush()之后从服务器检索响应; 我该怎么办? …
我们已经实现了对Netty事件循环队列的监控,以便了解我们的一些Netty模块的问题.监视器使用的io.netty.util.concurrent.SingleThreadEventExecutor#pendingTasks方法适用于大多数模块,但对于每秒处理几千个HTTP请求的模块,它似乎挂起或非常慢.我现在意识到文档严格规定这可能是一个问题,我觉得很蹩脚......所以我正在寻找另一种方法来实现这个监视器.
你可以在这里看到旧代码:https: //github.com/outbrain/ob1k/blob/6364187b30cab5b79d64835131d9168c754f3c09/ob1k-core/src/main/java/com/outbrain/ob1k/common/metrics/NettyQueuesGaugeBuilder.java
public static void registerQueueGauges(final MetricFactory factory, final EventLoopGroup elg, final String componentName) {
int index = 0;
for (final EventExecutor eventExecutor : elg) {
if (eventExecutor instanceof SingleThreadEventExecutor) {
final SingleThreadEventExecutor singleExecutor = (SingleThreadEventExecutor) eventExecutor;
factory.registerGauge("EventLoopGroup-" + componentName, "EventLoop-" + index, new Gauge<Integer>() {
@Override
public Integer getValue() {
return singleExecutor.pendingTasks();
}
});
index++;
}
}
}
Run Code Online (Sandbox Code Playgroud)
我的问题是,是否有更好的方法来监控队列大小?
这可能是一个非常有用的指标,因为它可用于理解延迟,并且在某些情况下也可用于施加背压.
我正在使用Cassandra,在启动过程中,Netty会打印一个带有堆栈跟踪的警告:
在类路径中找到Netty的本地epoll传输,但epoll不可用.改用NIO."
应用程序正常工作,但有没有办法解决警告?
这是完整的堆栈跟踪:
16:29:46 WARN com.datastax.driver.core.NettyUtil - Found Netty's native epoll transport in the classpath, but epoll is not available. Using NIO instead.
java.lang.UnsatisfiedLinkError: no netty-transport-native-epoll in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:168)
at io.netty.channel.epoll.Native.<clinit>(Native.java:49)
at io.netty.channel.epoll.Epoll.<clinit>(Epoll.java:30)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at com.datastax.driver.core.NettyUtil.<clinit>(NettyUtil.java:68)
at com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:101)
at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:709)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1386)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:341)
at com.datastax.driver.core.Cluster.connect(Cluster.java:286)
at org.springframework.cassandra.config.CassandraCqlSessionFactoryBean.connect(CassandraCqlSessionFactoryBean.java:100)
at org.springframework.cassandra.config.CassandraCqlSessionFactoryBean.afterPropertiesSet(CassandraCqlSessionFactoryBean.java:94)
at org.springframework.data.cassandra.config.CassandraSessionFactoryBean.afterPropertiesSet(CassandraSessionFactoryBean.java:60)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1642)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1579)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:545)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:482)
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302) …Run Code Online (Sandbox Code Playgroud) 我现在正尝试使用双向SSL握手设置Netty,其中客户端和服务器都提供并验证证书.
这似乎没有在SslHandler中实现.有没有人这样做?我想它会进入SslHandler.handshake操作并被委托给javax.net.ssl.SSLEngine?
任何提示/提示/预先存在的实现?
谢谢!
ANSWER(stackoverflow不会让我以正常的方式发布它)我发现如果我在设置我的SslHandler之前在SSLEngine对象上设置了needClientAuth标志,那就解决了这个问题!
我正在使用Netty(通过Ning异步HTTP 库)通过HTTP检索文档.这会在控制台上产生大量的调试输出,如下面列出的单个文档请求.
任何人都知道如何关闭它?我真的不需要看到这个输出.
我打电话给Scala,如果这有任何区别的话.
15:07:14.273 [run-main] DEBUG c.n.h.c.p.n.NettyAsyncHttpProvider -
Non cached request
DefaultHttpRequest(chunked: false)
GET /api/search.json?q=foo HTTP/1.1
Host: www.documentcloud.org
Connection: keep-alive
Accept: */*
User-Agent: NING/1.0
using Channel
[id: 0x2839ca40]
15:07:14.930 [New I/O client worker #1-1] DEBUG c.n.h.c.p.n.NettyAsyncHttpProvider -
Request DefaultHttpRequest(chunked: false)
GET /api/search.json?q=foo HTTP/1.1
Host: www.documentcloud.org
Connection: keep-alive
Accept: */*
User-Agent: NING/1.0
Response DefaultHttpResponse(chunked: true)
HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Content-Length: 10477
Connection: keep-alive
Vary: Accept-Encoding
Status: 200
X-Powered-By: Phusion Passenger (mod_rails/mod_rack) 3.0.13
ETag: "4f8f766d639dd84d014dfee3abb45de2"
X-Runtime: 611
Cache-Control: …Run Code Online (Sandbox Code Playgroud) 我有一个netty解码器,它使用GSon将来自Web客户端的JSon转换为适当的java对象.要求是:客户端可以发送不相关的类,A类,B类,C类等,但我想在管道中使用相同的单例解码器实例进行转换(因为我使用spring进行配置).我面临的问题是我需要class事先知道对象.
public Object decode()
{
gson.fromJson(jsonString, A.class);
}
Run Code Online (Sandbox Code Playgroud)
这不能解码B或C.我的库的用户现在需要为每个类编写单独的解码器,而不是稍后编译.我可以看到这样做的唯一方法是从Web客户端传递JSon字符串中的类名称"org.example.C",在解码器中解析它然后Class.forName用来获取类.有一个更好的方法吗?
我想了解Akka和Netty之间的区别.我知道你可以使用Scala和Java.我更感兴趣的是知道Netty哪里更好(如果有的话)和Akka更好(如果有的话).它们在哪里重叠,换句话说,我可以在哪些区域使用Akka而不是Netty,反之亦然.
grpc-java在ServerBuilder其中使用了一个执行程序,如果没有由该builder.executor()方法定义,则默认情况下使用静态缓存线程池.这个遗嘱执行人的确切用途是什么?它只是执行处理程序方法还是执行"其他"操作?
另外,grpc如何定义网络工作者EventLoopGroup?具体来说,我想知道如何将工作线程分配给此工作组.是否有线程数的默认值,或者它是机器核心数的函数?另外,关于上述问题,这些网络工作者如何与遗嘱执行人合作?他们只处理I/O - 读取和写入通道吗?
编辑:Netty,默认创建(2*个核心数)工作线程.
我的要求:
我想检测通道是否空闲以便读取一段时间并希望基于此超时.My Netty客户端正在向1000台服务器发送请求.
问题:我的Netty客户端永远不会显示如果有时候有任何空闲信道,即使我使用的是一些总是超时的ips.我怀疑我没有正确实现IdleStateHandler.我试过减少IdleStateHandler的读取超时但没有运气.我花了好几个小时搞清楚这一点.任何帮助将非常感激.
所以,下面是我的代码:
My Netty Client:
public void connect(final InetAddress remoteAddress){
new Bootstrap()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeout)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(httpNettyClientChannelInitializer)
.connect(remoteAddress, serverPort)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
future.cancel(!future.isSuccess());
}
});
}
Run Code Online (Sandbox Code Playgroud)
My Netty Channel Initalizer:
public class HttpNettyClientChannelInitializer extends ChannelInitializer<SocketChannel> {
private final Provider<HttpNettyClientChannelHandler> handlerProvider;
private final int timeout;
private int maxContentLength;
@Inject
public HttpNettyClientChannelInitializer(Provider<HttpNettyClientChannelHandler> handlerProvider,
@Named("readResponseTimeout") int timeout, @Named("maxContentLength") int maxContentLength) {
this.handlerProvider = handlerProvider;
this.timeout = timeout;
this.maxContentLength = maxContentLength;
} …Run Code Online (Sandbox Code Playgroud)