Linux上的Java BlockingQueue延迟很高

Joh*_*han 25 java linux multithreading latency

我正在使用BlockingQueue:s(尝试使用ArrayBlockingQueue和LinkedBlockingQueue)在我正在处理的应用程序中的不同线程之间传递对象.性能和延迟在这个应用程序中相对重要,所以我很好奇使用BlockingQueue在两个线程之间传递对象需要多长时间.为了衡量这一点,我写了一个简单的程序,有两个线程(一个消费者和一个生产者),我让生产者将时间戳(使用System.nanoTime())传递给消费者,参见下面的代码.

我记得在某个论坛上的某个地方读过,对于试过这个的人来说花了大约10微秒(不知道操作系统和硬件是什么),所以当我花了大约30微秒时,我并不感到惊讶Windows 7机箱(英特尔E7500核心2双核CPU,2.93GHz),同时在后台运行许多其他应用程序.然而,当我在速度更快的Linux服务器(两个Intel X5677 3.46GHz四核CPU,运行Debian 5和内核2.6.26-2-amd64)上进行相同的测试时,我感到非常惊讶.我预计延迟会低于我的Windows框,但相反它会高得多 - 约75 - 100微秒!两个测试都是使用Sun的Hotspot JVM版本1.6.0-23完成的.

有没有其他人在Linux上做过类似的测试?或者有人知道为什么Linux上的速度会慢得多(硬件更好),与Windows相比,Linux上的线程切换是否会慢得多?如果是这种情况,看起来Windows实际上更适合某种应用程序.任何帮助我理解相对较高的数字的帮助都非常感谢.

编辑:
在DaveC的评论之后,我还做了一个测试,我将JVM(在Linux机器上)限制为单个核心(即在同一核心上运行的所有线程).这大大改变了结果 - 延迟降至20微秒以下,即优于Windows机器上的结果.我还做了一些测试,我将生产者线程限制为一个核心,将消费者线程限制为另一个核心(尝试将它们放在同一个套接字和不同的套接字上),但这似乎没有帮助 - 延迟仍然是〜75微秒.顺便说一句,这个测试应用程序几乎就是我在执行测试时在机器上运行的所有应用程序.

有谁知道这些结果是否有意义?如果生产者和消费者在不同的核心上运行,它真的应该慢得多吗?任何输入都非常感谢.

再次编辑(1月6日):
我尝试对代码和运行环境进行不同的更改:

  1. 我将Linux内核升级到2.6.36.2(从2.6.26.2开始).内核升级后,测量时间变为60微秒,变化非常小,从升级前的75-100开始.为生产者和消费者线程设置CPU关联性没有任何影响,除非将它们限制在同一个核心.在同一核心上运行时,测得的延迟为13微秒.

  2. 在原始代码中,我让生产者在每次迭代之间进入休眠1秒钟,以便给消费者足够的时间来计算经过的时间并将其打印到控制台.如果我删除对Thread.sleep()的调用,而是让生产者和消费者在每次迭代中调用barrier.await()(消费者在将经过的时间打印到控制台后调用它),则测量的延迟从60微秒至10微秒以下.如果在同一核心上运行线程,则延迟低于1微秒.任何人都可以解释为什么这会显着减少延迟?我的第一个猜测是,这个改变产生了生成器在消费者调用queue.take()之前调用queue.put()的效果,所以消费者永远不必阻止,但在玩了一个修改版本的ArrayBlockingQueue后,我发现这个猜测是假的 - 消费者确实阻止了.如果您有其他猜测,请告诉我.(顺便说一句,如果我让生产者同时调用Thread.sleep()和barrier.await(),则延迟保持在60微秒).

  3. 我还尝试了另一种方法 - 而不是调用queue.take(),我调用了queue.poll(),超时为100微秒.这会将平均延迟降低到10微秒以下,但当然会占用更多的CPU(但繁忙等待的CPU密集程度可能更低).

再次编辑(1月10日) - 问题解决了:
ninjalj认为~60微秒的延迟是由于CPU不得不从更深的睡眠状态唤醒 - 而且他是完全正确的!在BIOS中禁用C状态后,延迟减少到<10微秒.这就解释了为什么我在上面的第2点获得了更好的延迟 - 当我更频繁地发送对象时,CPU保持足够忙,不能进入更深的睡眠状态.非常感谢所有花时间阅读我的问题并在此分享您的想法的人!

...

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CyclicBarrier;

public class QueueTest {

    ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(10);
    Thread consumerThread;
    CyclicBarrier barrier = new CyclicBarrier(2);
    static final int RUNS = 500000;
    volatile int sleep = 1000;

    public void start() {
        consumerThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    barrier.await();
                    for(int i = 0; i < RUNS; i++) {
                        consume();

                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } 
            }
        });
        consumerThread.start();

        try {
            barrier.await();
        } catch (Exception e) { e.printStackTrace(); }

        for(int i = 0; i < RUNS; i++) {
            try {
                if(sleep > 0)
                    Thread.sleep(sleep);
                produce();

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void produce() {
        try {
            queue.put(System.nanoTime());
        } catch (InterruptedException e) {
        }
    }

    public void consume() {
        try {
            long t = queue.take();
            long now = System.nanoTime();
            long time = (now - t) / 1000; // Divide by 1000 to get result in microseconds
            if(sleep > 0) {
                System.out.println("Time: " + time);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        QueueTest test = new QueueTest();
        System.out.println("Starting...");
        // Run first once, ignoring results
        test.sleep = 0;
        test.start();
        // Run again, printing the results
        System.out.println("Starting again...");
        test.sleep = 1000;
        test.start();
    }
}
Run Code Online (Sandbox Code Playgroud)

Mat*_*att 6

您的测试不是衡量队列切换延迟的一个很好的指标,因为您有一个线程读取队列,该队列System.out在再次执行之前同步写入(执行字符串和长连接).要正确测量这一点,您需要将此活动移出此线程,并在获取线程中尽可能少地工作.

你最好只在接受者中进行计算(然后现在)并将结果添加到其他集合中,该集合由另一个输出结果的线程定期排干.我倾向于通过添加到通过AtomicReference访问的适当规定的数组支持结构来实现这一点(因此报告线程只需要在该引用上使用该存储结构的另一个实例的getAndSet来获取最新批次的结果;例如make 2列表,将一个设置为活动,每个xsa线程唤醒并交换主动和被动的线程).然后,您可以报告某些分布而不是每个结果(例如,十分位数范围),这意味着您不会在每次运行时生成大量日志文件并为您打印有用的信息.

FWIW我同意Peter Lawrey所述的时代,如果延迟非常关键,那么你需要考虑忙于等待适当的cpu亲和力(即将核心专用于该线程)

1月6日后编辑

如果我删除对Thread.sleep()的调用,而是让生产者和消费者在每次迭代中调用barrier.await()(消费者在将经过的时间打印到控制台后调用它),则测量的延迟从60微秒至10微秒以下.如果在同一核心上运行线程,则延迟低于1微秒.任何人都可以解释为什么这会显着减少延迟?

你正在研究java.util.concurrent.locks.LockSupport#park(和相应的unpark)和之间的区别Thread#sleep.大多数洁悠神的东西是建立在LockSupport(通常通过AbstractQueuedSynchronizerReentrantLock提供或直接),这(在热点)解析到sun.misc.Unsafe#park(和unpark),这往往在并行线程(POSIX线程)LIB手中结束.通常pthread_cond_broadcast是醒来和/ pthread_cond_waitpthread_cond_timedwait类似的事情BlockingQueue#take.

我不能说我曾经看过如何Thread#sleep实际实现(因为我从来没有遇到过低延迟而不是基于条件的等待)但是我想这会导致它被调度程序降级比pthread信令机制更积极的方式,这是延迟差异的原因.


Pet*_*rey 4

如果可以的话,我会只使用 ArrayBlockingQueue。当我使用它时,Linux 上的延迟在 8-18 微秒之间。一些值得注意的地方。

  • 成本主要是唤醒线程所花费的时间。当您唤醒一个线程时,它的数据/代码不会在缓存中,因此您会发现,如果您对线程唤醒后发生的情况进行计时,则可能比重复运行相同的事情花费 2-5 倍的时间。
  • 某些操作使用操作系统调用(例如锁定/循环屏障),这些操作在低延迟场景中通常比忙等待更昂贵。我建议尝试忙着等待你的生产者而不是使用 CyclicBarrier。您也可以忙于等待您的消费者,但这在实际系统上可能会非常昂贵。