Java高负载NIO TCP服务器

Jur*_*riy 18 java high-load nio tcp

作为我研究的一部分,我正在用Java编写一个高负载的TCP/IP echo服务器.我想为大约3-4k的客户端提供服务,并且每秒可以看到我可以挤出的最大可能消息.消息大小非常小 - 最多100个字节.这项工作没有任何实际目的 - 只是一项研究.

根据我所见过的众多演讲(HornetQ基准测试,LMAX Disruptor会谈等),现实世界的高负载系统往往每秒服务数百万次交易(我相信Disruptor提到大约6密尔和大黄蜂 - 8.5).例如,这篇文章指出可以达到高达40M MPS.所以我把它作为现代硬件应该具备的粗略估计.

我编写了最简单的单线程NIO服务器并启动了负载测试.我很惊讶我在本地主机上只能获得大约10万MPS,在实际网络中只能获得25k MPS.数字看起来很小.我正在测试Win7 x64,核心i7.查看CPU负载 - 只有一个核心正忙(在单线程应用程序上预期),而其余核心处于空闲状态.然而,即使我加载所有8个核心(包括虚拟),我的MPS也不会超过800k - 甚至不到接近4千万:)

我的问题是:向客户提供大量消息的典型模式是什么?我应该在单个JVM内的几个不同套接字上分配网络负载,并使用某种负载均衡器(如HAProxy)将负载分配到多个内核吗?或者我应该考虑在我的NIO代码中使用多个选择器?或者甚至可能在多个JVM之间分配负载并使用Chronicle在它们之间建立进程间通信?在像CentOS这样适当的服务器端操作系统上进行测试会产生很大的不同(可能是Windows会减慢速度)吗?

下面是我的服务器的示例代码.对于任何传入的数据,它总是以"ok"回答.我知道在现实世界中,我需要跟踪消息的大小,并准备好一条消息可能在多次读取之间分配,但是我现在想让事情变得非常简单.

public class EchoServer {

private static final int BUFFER_SIZE = 1024;
private final static int DEFAULT_PORT = 9090;

// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);

private InetAddress hostAddress = null;

private int port;
private Selector selector;

private long loopTime;
private long numMessages = 0;

public EchoServer() throws IOException {
    this(DEFAULT_PORT);
}

public EchoServer(int port) throws IOException {
    this.port = port;
    selector = initSelector();
    loop();
}

private void loop() {
    while (true) {
        try{
            selector.select();
            Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
                SelectionKey key = selectedKeys.next();
                selectedKeys.remove();

                if (!key.isValid()) {
                    continue;
                }

                // Check what event is available and deal with it
                if (key.isAcceptable()) {
                    accept(key);
                } else if (key.isReadable()) {
                    read(key);
                } else if (key.isWritable()) {
                    write(key);
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
}

private void accept(SelectionKey key) throws IOException {
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

    SocketChannel socketChannel = serverSocketChannel.accept();
    socketChannel.configureBlocking(false);
    socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
    socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
    socketChannel.register(selector, SelectionKey.OP_READ);

    System.out.println("Client is connected");
}

private void read(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Clear out our read buffer so it's ready for new data
    readBuffer.clear();

    // Attempt to read off the channel
    int numRead;
    try {
        numRead = socketChannel.read(readBuffer);
    } catch (IOException e) {
        key.cancel();
        socketChannel.close();

        System.out.println("Forceful shutdown");
        return;
    }

    if (numRead == -1) {
        System.out.println("Graceful shutdown");
        key.channel().close();
        key.cancel();

        return;
    }

    socketChannel.register(selector, SelectionKey.OP_WRITE);

    numMessages++;
    if (numMessages%100000 == 0) {
        long elapsed = System.currentTimeMillis() - loopTime;
        loopTime = System.currentTimeMillis();
        System.out.println(elapsed);
    }
}

private void write(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();
    ByteBuffer dummyResponse = ByteBuffer.wrap("ok".getBytes("UTF-8"));

    socketChannel.write(dummyResponse);
    if (dummyResponse.remaining() > 0) {
        System.err.print("Filled UP");
    }

    key.interestOps(SelectionKey.OP_READ);
}

private Selector initSelector() throws IOException {
    Selector socketSelector = SelectorProvider.provider().openSelector();

    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);

    InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
    serverChannel.socket().bind(isa);
    serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
    return socketSelector;
}

public static void main(String[] args) throws IOException {
    System.out.println("Starting echo server");
    new EchoServer();
}
}
Run Code Online (Sandbox Code Playgroud)

Raj*_*jiv 21

what is a typical pattern for serving massive amounts of messages to clients?
Run Code Online (Sandbox Code Playgroud)

有许多可能的模式:使用所有核心而不经过多个jvms的简单方法是:

  1. 让一个线程接受连接并使用选择器进行读取.
  2. 一旦有足够的字节构成单个消息,使用类似环形缓冲区的构造将其传递给另一个核心.Disruptor Java框架非常适合这种情况.如果需要知道什么是完整消息的处理是轻量级的,那么这是一个很好的模式.例如,如果你有一个长度前缀协议,你可以等到你得到预期的字节数,然后将其发送到另一个线程.如果协议的解析非常繁重,那么您可能会压倒这个单一线程,阻止它接受连接或读取网络的字节.
  3. 在从环形缓冲区接收数据的工作线程上,进行实际处理.
  4. 您可以在工作线程上或通过其他聚合器线程写出响应.

这是它的要点.这里有更多的可能性,答案实际上取决于您正在编写的应用程序类型.一些例子是:

  1. CPU重型无状态应用程序称为图像处理应用程序.每个请求完成的CPU/GPU工作量可能会显着高于非常天真的线程间通信解决方案所产生的开销.在这种情况下,一个简单的解决方案是从一个队列中拉出工作的一堆工作线程.注意这是一个单个队列而不是每个worker的一个队列.优点是这本身就是负载平衡的.每个工作人员完成它的工作,然后只轮询单生产者多用户队列.即使这是一个争用的来源,图像处理工作(秒?)应该比任何同步替代品贵得多.
  2. 一个纯IO应用程序,例如一个stats服务器,它只是为一个请求递增一些计数器:这里你几乎没有CPU繁重的工作.大多数工作只是读取字节和写入字节.多线程应用程序可能不会给您带来显着的好处.事实上,如果排队项目所花费的时间超过处理项目所需的时间,它甚至可能会减慢速度.单线程Java服务器应该能够轻松地使1G链路饱和.
  3. 需要适量处理的有状态应用程序,例如典型的业务应用程序:此处每个客户端都有一些状态,用于确定每个请求的处理方式.假设我们进行多线程,因为处理非常重要,我们可以将客户端关联到某些线程.这是actor体系结构的变体:

    i)当客户端首先将哈希值连接到工作者时.您可能希望使用某个客户端ID执行此操作,以便在断开连接并重新连接时仍将其分配给同一个worker/actor.

    ii)当读者线程读取完整的请求时,将其放在正确的工作者/演员的环形缓冲区上.由于同一个工作程序总是处理特定客户端,因此所有状态都应该是线程本地的,这使得所有处理逻辑都变得简单和单线程.

    iii)工作线程可以写出请求.总是尝试做一个write().如果您的所有数据都无法写出,那么您是否注册了OP_WRITE.如果实际存在未完成的事情,则工作线程只需要进行选择调用.大多数写入应该成功,使这不必要.这里的技巧是在选择调用和轮询环形缓冲区之间进行平衡以获得更多请求.您还可以使用单个编写器线程,其唯一的责任是写出请求.每个工作线程都可以将它的响应放在一个环形缓冲区上,将它连接到这个单一的编写器线程.单个写入器线程循环轮询每个传入的环形缓冲区并将数据写出到客户端.关于在select之前尝试写入的警告再次应用,以及关于在多个环形缓冲区和选择调用之间进行平衡的技巧.

正如您所指出的,还有许多其他选择:

Should I distribute networking load over several different sockets inside a single JVM and use some sort of load balancer like HAProxy to distribute load to multiple cores?

你可以这样做,但恕我直言这不是负载均衡器的最佳用途.这确实会为您购买独立的JVM,这些JVM可能会自行失败,但可能比编写多线程的单个JVM应用程序要慢.应用程序本身可能更容易编写,因为它将是单线程的.

Or I should look towards using multiple Selectors in my NIO code?
Run Code Online (Sandbox Code Playgroud)

你也可以这样做.查看Ngnix架构,了解如何执行此操作的一些提示.

Or maybe even distribute the load between multiple JVMs and use Chronicle to build an inter-process communication between them? 这也是一种选择.Chronicle为您提供了一个优势,即内存映射文件对于中间退出流程更具弹性.所有通信都通过共享内存完成,您仍然可以获得充足的性能.

Will testing on a proper serverside OS like CentOS make a big difference (maybe it is Windows that slows things down)?
Run Code Online (Sandbox Code Playgroud)

我不知道这个.不太可能.如果Java充分利用本机Windows API,那么它应该无关紧要.我非常怀疑4000万个事务/秒数(没有用户空间网络堆栈+ UDP)但我列出的架构应该做得很好.

这些体系结构往往表现良好,因为它们是单编写器体系结构,它使用基于数据结构的有界数据结构进行线程间通信.确定多线程是否是答案.在许多情况下,它不是必需的,可能导致减速.

另一个需要研究的领域是内存分配方案.具体而言,分配和重用缓冲区的策略可以带来显着的好处.正确的缓冲区重用策略取决于应用程序.看看好友内存分配,竞技场分配等方案,看看它们是否能让你受益.JVM GC对大多数工作负载都做得很好,但是在你走这条路线之前总是要测量.

协议设计对性能也有很大影响.我倾向于选择长度前缀协议,因为它们允许您分配正确大小的缓冲区,避免缓冲区列表和/或缓冲区合并.长度前缀协议还可以轻松决定何时切换请求 - 只需检查num bytes == expected.实际的解析可以由worker线程完成.序列化和反序列化扩展到长度前缀协议之外.像缓冲模式而不是分配的模式有助于此处.请看SBE的一些原则.

你可以想象整篇论文都可以写在这里.这应该让你朝着正确的方向前进.警告:始终测量并确保您需要比最简单的选项更多的性能.很容易陷入永无止境的性能改进黑洞.


use*_*421 6

你关于写作的逻辑是错误的。您应该立即尝试写入您有数据要写入的内容。如果write()返回零它是那么的时间为OP_WRITE登记,重试写入当信道变为可写,并注销了OP_WRITE,当写成功。你在这里增加了大量的延迟。通过OP_READ在执行所有操作时取消注册,您会增加更多延迟。